This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f81114dbc7f [opt](agent-task) Add a daemon thread to clean up agent 
tasks on dead BEs (#57591) (#59261)
f81114dbc7f is described below

commit f81114dbc7f1f49964b4bff9de7ea78a288e2015
Author: Siyang Tang <[email protected]>
AuthorDate: Tue Dec 23 18:15:01 2025 +0800

    [opt](agent-task) Add a daemon thread to clean up agent tasks on dead BEs 
(#57591) (#59261)
    
    When BE down, corresponding tasks will never finish until timeout. Fix
    this problem by adding a daemon thrad to do clean up.
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../main/java/org/apache/doris/common/Config.java  |  5 ++
 .../main/java/org/apache/doris/catalog/Env.java    |  7 ++
 .../apache/doris/task/AgentTaskCleanupDaemon.java  | 83 ++++++++++++++++++++++
 .../java/org/apache/doris/task/AgentTaskQueue.java | 13 ++++
 .../main/java/org/apache/doris/task/PushTask.java  |  2 +-
 .../test_sc_fail_when_be_down.groovy               | 64 +++++++++++++++++
 .../test_sc_success_when_be_down.groovy            | 56 +++++++++++++++
 7 files changed, 229 insertions(+), 1 deletion(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5e59f74fde8..7f91044f2df 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3718,4 +3718,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static String aws_credentials_provider_version = "v2";
 
+    @ConfField(description = {
+            "agent tasks 健康检查的时间间隔,默认五分钟,小于等于0时不做健康检查",
+            "agent tasks health check interval, default is five minutes, no 
health check when less than or equal to 0"
+    })
+    public static long agent_task_health_check_intervals_ms = 5 * 60 * 1000L; 
// 5 min
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 7247fbd5249..ee473b0c806 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -263,6 +263,7 @@ import org.apache.doris.system.HeartbeatMgr;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.system.SystemInfoService.HostInfo;
 import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskCleanupDaemon;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.CompactionTask;
 import org.apache.doris.task.MasterTaskExecutor;
@@ -579,6 +580,8 @@ public class Env {
 
     private StatisticsMetricCollector statisticsMetricCollector;
 
+    private AgentTaskCleanupDaemon agentTaskCleanupDaemon;
+
     // if a config is relative to a daemon thread. record the relation here. 
we will proactively change interval of it.
     private final Map<String, Supplier<MasterDaemon>> configtoThreads = 
ImmutableMap
             .of("dynamic_partition_check_interval_seconds", 
this::getDynamicPartitionScheduler);
@@ -838,6 +841,9 @@ public class Env {
         this.dictionaryManager = new DictionaryManager();
         this.keyManagerStore = new KeyManagerStore();
         this.keyManager = KeyManagerFactory.getKeyManager();
+        if (Config.agent_task_health_check_intervals_ms > 0) {
+            this.agentTaskCleanupDaemon = new AgentTaskCleanupDaemon();
+        }
     }
 
     public static Map<String, Long> getSessionReportTimeMap() {
@@ -1956,6 +1962,7 @@ public class Env {
         if (keyManager != null) {
             keyManager.init();
         }
+        agentTaskCleanupDaemon.start();
     }
 
     // start threads that should run on all FE
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java
new file mode 100644
index 00000000000..bdf87911ec9
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+
+public class AgentTaskCleanupDaemon extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(AgentTaskCleanupDaemon.class);
+
+    public static final Integer MAX_FAILURE_TIMES = 3;
+
+    private final Map<Long, Integer> beInactiveCheckFailures = 
Maps.newHashMap();
+
+    public AgentTaskCleanupDaemon() {
+        super("agent-task-cleanup", 
Config.agent_task_health_check_intervals_ms);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        LOG.info("Begin to clean up inactive agent tasks");
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+        infoService.getAllClusterBackends(false)
+                .forEach(backend -> {
+                    long id = backend.getId();
+                    if (backend.isAlive()) {
+                        beInactiveCheckFailures.remove(id);
+                    } else {
+                        Integer failureTimes = 
beInactiveCheckFailures.compute(id, (beId, failures) -> {
+                            int updated = (failures == null ? 1 : failures + 
1);
+                            if (updated >= MAX_FAILURE_TIMES) {
+                                removeInactiveBeAgentTasks(beId);
+                            }
+                            return updated;
+                        });
+                        LOG.info("Check failure on be={}, times={}", 
failureTimes, failureTimes);
+                    }
+                });
+
+        LOG.info("Finish to clean up inactive agent tasks");
+    }
+
+    private void removeInactiveBeAgentTasks(Long beId) {
+        AgentTaskQueue.removeTask(beId, (agentTask -> {
+            String errMsg = "BE down, this agent task is aborted";
+            if (agentTask instanceof PushTask) {
+                PushTask task = ((PushTask) agentTask);
+                task.countDownLatchWithStatus(beId, agentTask.getTabletId(), 
new Status(TStatusCode.ABORTED, errMsg));
+            }
+            agentTask.setFinished(true);
+            agentTask.setErrorCode(TStatusCode.ABORTED);
+            agentTask.setErrorMsg(errMsg);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("BE down, remove agent task: {}", agentTask);
+            }
+        }));
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index 97e1a3cc676..b61a3aa708d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -33,6 +33,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Consumer;
 
 /**
  * Task queue
@@ -96,6 +97,18 @@ public class AgentTaskQueue {
         --taskNum;
     }
 
+    public static synchronized void removeTask(long backendId, 
Consumer<AgentTask> onTaskRemoved) {
+        Map<TTaskType, Map<Long, AgentTask>> tasks = 
AgentTaskQueue.tasks.row(backendId);
+        tasks.forEach((type, taskSet) -> {
+            Iterator<Map.Entry<Long, AgentTask>> it = 
taskSet.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<Long, AgentTask> entry = it.next();
+                it.remove();
+                onTaskRemoved.accept(entry.getValue());
+            }
+        });
+    }
+
     /*
      * we cannot define a push task with only 'backendId', 'signature' and 
'TTaskType'
      * add version and TPushType to help
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index 53c7ab96925..cb06f4d27f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -58,7 +58,7 @@ public class PushTask extends AgentTask {
     private TPushType pushType;
     private List<Predicate> conditions;
     // for synchronous delete
-    private MarkedCountDownLatch latch;
+    private MarkedCountDownLatch<Long, Long> latch;
 
     // lzop decompress or not
     private boolean needDecompress;
diff --git 
a/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy 
b/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy
new file mode 100644
index 00000000000..411ffa893a1
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_sc_fail_when_be_down", "docker") {
+    def options = new ClusterOptions()
+    options.cloudMode = false
+    options.beNum = 3
+    options.feNum = 2
+    options.enableDebugPoints()
+    options.feConfigs += ["agent_task_health_check_intervals_ms=5000"]
+
+    docker(options) {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
+        def tblName = "test_sc_fail_when_be_down"
+        sql """ DROP TABLE IF EXISTS ${tblName} """
+        sql """
+                CREATE TABLE IF NOT EXISTS ${tblName} (
+                    `k` int NOT NULL,
+                    `v0` int NOT NULL,
+                    `v1` int NOT NULL
+                ) 
+                DUPLICATE KEY(`k`)
+                DISTRIBUTED BY HASH(`k`) BUCKETS 24
+                PROPERTIES (
+                    "replication_allocation" = "tag.location.default: 3"
+                )
+        """
+        sql """ INSERT INTO ${tblName} SELECT number, number, number from 
numbers("number" = "1024") """
+
+        
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+        try {
+            sql """ ALTER TABLE ${tblName} MODIFY COLUMN v1 VARCHAR(100) """
+            sleep(1000)
+            cluster.stopBackends(1, 2)
+            sleep(10000)
+            def ret = sql """ SHOW ALTER TABLE COLUMN WHERE 
TableName='test_sc_stuck_when_be_down' ORDER BY createtime DESC LIMIT 1 """
+            println(ret)
+            waitForSchemaChangeDone {
+                sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' 
ORDER BY createtime DESC LIMIT 1 """
+                time 600
+            }
+            assertTrue(false)
+        } catch (Throwable ignore) {
+            // do nothing
+        }
+    }
+}
diff --git 
a/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy 
b/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy
new file mode 100644
index 00000000000..e5eb2bd3028
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_sc_success_when_be_down", "docker") {
+    def options = new ClusterOptions()
+    options.cloudMode = false
+    options.beNum = 3
+    options.feNum = 2
+    options.enableDebugPoints()
+    options.feConfigs += ["agent_task_health_check_intervals_ms=5000"]
+
+    docker(options) {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
+        def tblName = "test_sc_success_when_be_down"
+        sql """ DROP TABLE IF EXISTS ${tblName} """
+        sql """
+                CREATE TABLE IF NOT EXISTS ${tblName} (
+                    `k` int NOT NULL,
+                    `v0` int NOT NULL,
+                    `v1` int NOT NULL
+                ) 
+                DUPLICATE KEY(`k`)
+                DISTRIBUTED BY HASH(`k`) BUCKETS 24
+                PROPERTIES (
+                    "replication_allocation" = "tag.location.default: 3"
+                )
+        """
+        sql """ INSERT INTO ${tblName} SELECT number, number, number from 
numbers("number" = "1024") """
+
+        
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+        sql """ ALTER TABLE ${tblName} MODIFY COLUMN v0 VARCHAR(100) """
+        sleep(3000)
+        cluster.stopBackends(1)
+        waitForSchemaChangeDone {
+        sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' ORDER BY 
createtime DESC LIMIT 1 """
+        time 600
+}
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to