This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new f81114dbc7f [opt](agent-task) Add a daemon thread to clean up agent
tasks on dead BEs (#57591) (#59261)
f81114dbc7f is described below
commit f81114dbc7f1f49964b4bff9de7ea78a288e2015
Author: Siyang Tang <[email protected]>
AuthorDate: Tue Dec 23 18:15:01 2025 +0800
[opt](agent-task) Add a daemon thread to clean up agent tasks on dead BEs
(#57591) (#59261)
When BE down, corresponding tasks will never finish until timeout. Fix
this problem by adding a daemon thrad to do clean up.
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../main/java/org/apache/doris/common/Config.java | 5 ++
.../main/java/org/apache/doris/catalog/Env.java | 7 ++
.../apache/doris/task/AgentTaskCleanupDaemon.java | 83 ++++++++++++++++++++++
.../java/org/apache/doris/task/AgentTaskQueue.java | 13 ++++
.../main/java/org/apache/doris/task/PushTask.java | 2 +-
.../test_sc_fail_when_be_down.groovy | 64 +++++++++++++++++
.../test_sc_success_when_be_down.groovy | 56 +++++++++++++++
7 files changed, 229 insertions(+), 1 deletion(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5e59f74fde8..7f91044f2df 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3718,4 +3718,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static String aws_credentials_provider_version = "v2";
+ @ConfField(description = {
+ "agent tasks 健康检查的时间间隔,默认五分钟,小于等于0时不做健康检查",
+ "agent tasks health check interval, default is five minutes, no
health check when less than or equal to 0"
+ })
+ public static long agent_task_health_check_intervals_ms = 5 * 60 * 1000L;
// 5 min
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 7247fbd5249..ee473b0c806 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -263,6 +263,7 @@ import org.apache.doris.system.HeartbeatMgr;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskCleanupDaemon;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.CompactionTask;
import org.apache.doris.task.MasterTaskExecutor;
@@ -579,6 +580,8 @@ public class Env {
private StatisticsMetricCollector statisticsMetricCollector;
+ private AgentTaskCleanupDaemon agentTaskCleanupDaemon;
+
// if a config is relative to a daemon thread. record the relation here.
we will proactively change interval of it.
private final Map<String, Supplier<MasterDaemon>> configtoThreads =
ImmutableMap
.of("dynamic_partition_check_interval_seconds",
this::getDynamicPartitionScheduler);
@@ -838,6 +841,9 @@ public class Env {
this.dictionaryManager = new DictionaryManager();
this.keyManagerStore = new KeyManagerStore();
this.keyManager = KeyManagerFactory.getKeyManager();
+ if (Config.agent_task_health_check_intervals_ms > 0) {
+ this.agentTaskCleanupDaemon = new AgentTaskCleanupDaemon();
+ }
}
public static Map<String, Long> getSessionReportTimeMap() {
@@ -1956,6 +1962,7 @@ public class Env {
if (keyManager != null) {
keyManager.init();
}
+ agentTaskCleanupDaemon.start();
}
// start threads that should run on all FE
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java
new file mode 100644
index 00000000000..bdf87911ec9
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+
+public class AgentTaskCleanupDaemon extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(AgentTaskCleanupDaemon.class);
+
+ public static final Integer MAX_FAILURE_TIMES = 3;
+
+ private final Map<Long, Integer> beInactiveCheckFailures =
Maps.newHashMap();
+
+ public AgentTaskCleanupDaemon() {
+ super("agent-task-cleanup",
Config.agent_task_health_check_intervals_ms);
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ LOG.info("Begin to clean up inactive agent tasks");
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
+ infoService.getAllClusterBackends(false)
+ .forEach(backend -> {
+ long id = backend.getId();
+ if (backend.isAlive()) {
+ beInactiveCheckFailures.remove(id);
+ } else {
+ Integer failureTimes =
beInactiveCheckFailures.compute(id, (beId, failures) -> {
+ int updated = (failures == null ? 1 : failures +
1);
+ if (updated >= MAX_FAILURE_TIMES) {
+ removeInactiveBeAgentTasks(beId);
+ }
+ return updated;
+ });
+ LOG.info("Check failure on be={}, times={}",
failureTimes, failureTimes);
+ }
+ });
+
+ LOG.info("Finish to clean up inactive agent tasks");
+ }
+
+ private void removeInactiveBeAgentTasks(Long beId) {
+ AgentTaskQueue.removeTask(beId, (agentTask -> {
+ String errMsg = "BE down, this agent task is aborted";
+ if (agentTask instanceof PushTask) {
+ PushTask task = ((PushTask) agentTask);
+ task.countDownLatchWithStatus(beId, agentTask.getTabletId(),
new Status(TStatusCode.ABORTED, errMsg));
+ }
+ agentTask.setFinished(true);
+ agentTask.setErrorCode(TStatusCode.ABORTED);
+ agentTask.setErrorMsg(errMsg);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BE down, remove agent task: {}", agentTask);
+ }
+ }));
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index 97e1a3cc676..b61a3aa708d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -33,6 +33,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Consumer;
/**
* Task queue
@@ -96,6 +97,18 @@ public class AgentTaskQueue {
--taskNum;
}
+ public static synchronized void removeTask(long backendId,
Consumer<AgentTask> onTaskRemoved) {
+ Map<TTaskType, Map<Long, AgentTask>> tasks =
AgentTaskQueue.tasks.row(backendId);
+ tasks.forEach((type, taskSet) -> {
+ Iterator<Map.Entry<Long, AgentTask>> it =
taskSet.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Long, AgentTask> entry = it.next();
+ it.remove();
+ onTaskRemoved.accept(entry.getValue());
+ }
+ });
+ }
+
/*
* we cannot define a push task with only 'backendId', 'signature' and
'TTaskType'
* add version and TPushType to help
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index 53c7ab96925..cb06f4d27f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -58,7 +58,7 @@ public class PushTask extends AgentTask {
private TPushType pushType;
private List<Predicate> conditions;
// for synchronous delete
- private MarkedCountDownLatch latch;
+ private MarkedCountDownLatch<Long, Long> latch;
// lzop decompress or not
private boolean needDecompress;
diff --git
a/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy
b/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy
new file mode 100644
index 00000000000..411ffa893a1
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_sc_fail_when_be_down", "docker") {
+ def options = new ClusterOptions()
+ options.cloudMode = false
+ options.beNum = 3
+ options.feNum = 2
+ options.enableDebugPoints()
+ options.feConfigs += ["agent_task_health_check_intervals_ms=5000"]
+
+ docker(options) {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def tblName = "test_sc_fail_when_be_down"
+ sql """ DROP TABLE IF EXISTS ${tblName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tblName} (
+ `k` int NOT NULL,
+ `v0` int NOT NULL,
+ `v1` int NOT NULL
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 24
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 3"
+ )
+ """
+ sql """ INSERT INTO ${tblName} SELECT number, number, number from
numbers("number" = "1024") """
+
+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+ try {
+ sql """ ALTER TABLE ${tblName} MODIFY COLUMN v1 VARCHAR(100) """
+ sleep(1000)
+ cluster.stopBackends(1, 2)
+ sleep(10000)
+ def ret = sql """ SHOW ALTER TABLE COLUMN WHERE
TableName='test_sc_stuck_when_be_down' ORDER BY createtime DESC LIMIT 1 """
+ println(ret)
+ waitForSchemaChangeDone {
+ sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}'
ORDER BY createtime DESC LIMIT 1 """
+ time 600
+ }
+ assertTrue(false)
+ } catch (Throwable ignore) {
+ // do nothing
+ }
+ }
+}
diff --git
a/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy
b/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy
new file mode 100644
index 00000000000..e5eb2bd3028
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_sc_success_when_be_down", "docker") {
+ def options = new ClusterOptions()
+ options.cloudMode = false
+ options.beNum = 3
+ options.feNum = 2
+ options.enableDebugPoints()
+ options.feConfigs += ["agent_task_health_check_intervals_ms=5000"]
+
+ docker(options) {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def tblName = "test_sc_success_when_be_down"
+ sql """ DROP TABLE IF EXISTS ${tblName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tblName} (
+ `k` int NOT NULL,
+ `v0` int NOT NULL,
+ `v1` int NOT NULL
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 24
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 3"
+ )
+ """
+ sql """ INSERT INTO ${tblName} SELECT number, number, number from
numbers("number" = "1024") """
+
+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+ sql """ ALTER TABLE ${tblName} MODIFY COLUMN v0 VARCHAR(100) """
+ sleep(3000)
+ cluster.stopBackends(1)
+ waitForSchemaChangeDone {
+ sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' ORDER BY
createtime DESC LIMIT 1 """
+ time 600
+}
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]