This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d4a6cafc61 [Feature-17931] Support configurable maximum runtime for
workflow/task instance (#17932)
d4a6cafc61 is described below
commit d4a6cafc61ece78d98b9a21157709652460f52ca
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Feb 13 16:29:56 2026 +0800
[Feature-17931] Support configurable maximum runtime for workflow/task
instance (#17932)
---
deploy/kubernetes/dolphinscheduler/README.md | 2 +
deploy/kubernetes/dolphinscheduler/values.yaml | 4 +
docs/docs/en/architecture/configuration.md | 38 ++++-----
docs/docs/zh/architecture/configuration.md | 34 ++++----
.../server/master/config/MasterConfig.java | 1 +
.../config/MasterServerLoadProtectionConfig.java | 23 ++++++
.../lifecycle/event/TaskTimeoutLifecycleEvent.java | 20 +++--
.../handler/TaskStartLifecycleEventHandler.java | 19 ++++-
.../handler/TaskTimeoutLifecycleEventHandler.java | 4 +-
.../src/main/resources/application.yaml | 4 +
.../integration/cases/WorkflowStartTestCase.java | 33 ++++++++
.../src/test/resources/application.yaml | 4 +
.../workflow_with_system_timeout_kill_task.yaml | 92 ++++++++++++++++++++++
13 files changed, 230 insertions(+), 48 deletions(-)
diff --git a/deploy/kubernetes/dolphinscheduler/README.md
b/deploy/kubernetes/dolphinscheduler/README.md
index 0ca2806044..bfa4da6b50 100644
--- a/deploy/kubernetes/dolphinscheduler/README.md
+++ b/deploy/kubernetes/dolphinscheduler/README.md
@@ -215,6 +215,8 @@ Please refer to the [Quick Start in
Kubernetes](../../../docs/docs/en/guide/inst
|
master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_JVM_CPU_USAGE_PERCENTAGE_THRESHOLDS
| float | `0.7` | Master max jvm cpu usage, when the master's jvm cpu usage is
smaller then this value, master server can execute workflow. |
|
master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_CPU_USAGE_PERCENTAGE_THRESHOLDS
| float | `0.7` | Master max system cpu usage, when the master's system cpu
usage is smaller then this value, master server can execute workflow. |
|
master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS
| float | `0.7` | Master max System memory usage , when the master's system
memory usage is smaller then this value, master server can execute workflow. |
+| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_TASK_INSTANCE_RUNTIME | string
| `"0d"` | Maximum allowed running time for a task instance. If the running
duration exceeds this value, the instance will be killed. The default value of
0d indicates no limit. |
+| master.env.MASTER_SERVER_LOAD_PROTECTION_MAX_WORKFLOW_INSTANCE_RUNTIME |
string | `"0d"` | Maximum allowed running time for a workflow instance. If the
running duration exceeds this value, the instance will be killed. The default
value of 0d indicates no limit. |
| master.env.MASTER_STATE_WHEEL_INTERVAL | string | `"5s"` | master state
wheel interval, the unit is second |
| master.env.MASTER_TASK_COMMIT_INTERVAL | string | `"1s"` | master commit
task interval, the unit is second |
| master.env.MASTER_TASK_COMMIT_RETRYTIMES | string | `"5"` | Master commit
task retry times |
diff --git a/deploy/kubernetes/dolphinscheduler/values.yaml
b/deploy/kubernetes/dolphinscheduler/values.yaml
index 3beefea212..c9d545896c 100644
--- a/deploy/kubernetes/dolphinscheduler/values.yaml
+++ b/deploy/kubernetes/dolphinscheduler/values.yaml
@@ -565,6 +565,10 @@ master:
MASTER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS: 0.7
# -- Master max disk usage , when the master's disk usage is smaller then
this value, master server can execute workflow.
MASTER_SERVER_LOAD_PROTECTION_MAX_DISK_USAGE_PERCENTAGE_THRESHOLDS: 0.7
+ # -- Maximum allowed running time for a workflow instance. If the running
duration exceeds this value, the instance will be killed. The default value of
0d indicates no limit.
+ MASTER_SERVER_LOAD_PROTECTION_MAX_WORKFLOW_INSTANCE_RUNTIME: 0d
+ # -- Maximum allowed running time for a task instance. If the running
duration exceeds this value, the instance will be killed. The default value of
0d indicates no limit.
+ MASTER_SERVER_LOAD_PROTECTION_MAX_TASK_INSTANCE_RUNTIME: 0d
# -- Master failover interval, the unit is minute
MASTER_FAILOVER_INTERVAL: "10m"
# -- Master kill application when handle failover
diff --git a/docs/docs/en/architecture/configuration.md
b/docs/docs/en/architecture/configuration.md
index 27c289bcec..a45048115d 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -275,24 +275,26 @@ Location: `api-server/conf/application.yaml`
Location: `master-server/conf/application.yaml`
-| Parameters
| Default value |
Description
|
-|-----------------------------------------------------------------------------|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|
-| master.listen-port
| 5678 | master listen port
|
-| master.logic-task-config.task-executor-thread-count
| 2 * CPU +1 | The thread size used to execute logic task
|
-| master.worker-load-balancer-configuration-properties.type
| DYNAMIC_WEIGHTED_ROUND_ROBIN | Master will use the worker's
cpu/memory/threadPool usage to calculate the worker load, the lower load will
have more change to be dispatched task |
-| master.max-heartbeat-interval
| 10s | master max heartbeat interval
|
-| master.server-load-protection.enabled
| true | If set true, will open master overload
protection
|
-| master.server-load-protection.max-system-cpu-usage-percentage-thresholds
| 0.8 | Master max system cpu usage, when the master's
system cpu usage is smaller then this value, master server can execute
workflow. |
-| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds
| 0.8 | Master max JVM cpu usage, when the master's
jvm cpu usage is smaller then this value, master server can execute workflow.
|
-| master.server-load-protection.max-system-memory-usage-percentage-thresholds
| 0.8 | Master max system memory usage , when the
master's system memory usage is smaller then this value, master server can
execute workflow. |
-| master.server-load-protection.max-disk-usage-percentage-thresholds
| 0.8 | Master max disk usage , when the master's disk
usage is smaller then this value, master server can execute workflow.
|
-| master.server-load-protection.max-concurrent-workflow-instances
| 2147483647 | Master max concurrent workflow instances, when
the master's workflow instance count reaches or exceeds this value, master
server will be marked as busy. |
-| master.worker-group-refresh-interval
| 10s | The interval to refresh worker group from db
to memory
|
-| master.command-fetch-strategy.type
| ID_SLOT_BASED | The command fetch strategy, only support
`ID_SLOT_BASED`
|
-| master.command-fetch-strategy.config.id-step
| 1 | The id auto incremental step of t_ds_command
in db
|
-| master.command-fetch-strategy.config.fetch-size
| 10 | The number of commands fetched by master
|
-| master.task-dispatch-policy.dispatch-timeout-enabled
| false | Indicates whether the dispatch timeout
checking mechanism is enabled
|
-| master.task-dispatch-policy.max-task-dispatch-duration
| 1h | The maximum allowed duration a task may wait
in the dispatch queue before being assigned to a worker
|
+| Parameters
| Default value |
Description
|
+|-----------------------------------------------------------------------------|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| master.listen-port
| 5678 | master listen port
|
+| master.logic-task-config.task-executor-thread-count
| 2 * CPU +1 | The thread size used to execute logic task
|
+| master.worker-load-balancer-configuration-properties.type
| DYNAMIC_WEIGHTED_ROUND_ROBIN | Master will use the worker's
cpu/memory/threadPool usage to calculate the worker load, the lower load will
have more change to be dispatched task
|
+| master.max-heartbeat-interval
| 10s | master max heartbeat interval
|
+| master.server-load-protection.enabled
| true | If set true, will open master overload
protection
|
+| master.server-load-protection.max-system-cpu-usage-percentage-thresholds
| 0.8 | Master max system cpu usage, when the master's
system cpu usage is smaller then this value, master server can execute
workflow. |
+| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds
| 0.8 | Master max JVM cpu usage, when the master's
jvm cpu usage is smaller then this value, master server can execute workflow.
|
+| master.server-load-protection.max-system-memory-usage-percentage-thresholds
| 0.8 | Master max system memory usage , when the
master's system memory usage is smaller then this value, master server can
execute workflow. |
+| master.server-load-protection.max-disk-usage-percentage-thresholds
| 0.8 | Master max disk usage , when the master's disk
usage is smaller then this value, master server can execute workflow.
|
+| master.server-load-protection.max-concurrent-workflow-instances
| 2147483647 | Master max concurrent workflow instances, when
the master's workflow instance count reaches or exceeds this value, master
server will be marked as busy. |
+| master.server-load-protection.max-workflow-instance-runtime
| 0m | Maximum allowed running time for a workflow
instance. If the running duration exceeds this value, the instance will be
kill. The default value of 0d indicates no limit, the min value is 1m. |
+| master.server-load-protection.max-task-instance-runtime
| 0m | Maximum allowed running time for a task
instance. If the running duration exceeds this value, the instance will be
kill. The default value of 0d indicates no limit, the min value is 1m. |
+| master.worker-group-refresh-interval
| 10s | The interval to refresh worker group from db
to memory
|
+| master.command-fetch-strategy.type
| ID_SLOT_BASED | The command fetch strategy, only support
`ID_SLOT_BASED`
|
+| master.command-fetch-strategy.config.id-step
| 1 | The id auto incremental step of t_ds_command
in db
|
+| master.command-fetch-strategy.config.fetch-size
| 10 | The number of commands fetched by master
|
+| master.task-dispatch-policy.dispatch-timeout-enabled
| false | Indicates whether the dispatch timeout
checking mechanism is enabled
|
+| master.task-dispatch-policy.max-task-dispatch-duration
| 1h | The maximum allowed duration a task may wait
in the dispatch queue before being assigned to a worker
|
### Worker Server related configuration
diff --git a/docs/docs/zh/architecture/configuration.md
b/docs/docs/zh/architecture/configuration.md
index 80fed042f1..cc1e901ca9 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -305,22 +305,24 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
位置:`worker-server/conf/application.yaml`
-| 参数
| 默认值 | 描述
|
-|-----------------------------------------------------------------------------|-----------|-----------------------------------------------------------------------------------------|
-| worker.listen-port
| 1234 | worker监听端口
|
-| worker.max-heartbeat-interval
| 10s | worker最大心跳间隔
|
-| worker.host-weight
| 100 | 派发任务时,worker主机的权重
|
-| worker.tenant-auto-create
| true |
租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。
|
-| worker.server-load-protection.enabled
| true | 是否开启系统保护策略
|
-| worker.server-load-protection.max-system-cpu-usage-percentage-thresholds
| 0.8 | worker最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,worker服务才能接收任务.
默认值为0.8: 会使用80%的操作系统CPU |
-| worker.server-load-protection.max-jvm-cpu-usage-percentage-thresholds
| 0.8 | worker最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,worker服务才能接收任务.
默认值为0.8: 会使用80%的JVM CPU |
-| worker.server-load-protection.max-system-memory-usage-percentage-thresholds
| 0.8 | worker最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,worker服务才能接收任务. 默认值为0.8:
会使用80%的操作系统内存 |
-| worker.server-load-protection.max-disk-usage-percentage-thresholds
| 0.8 | worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.8:
会使用80%的操作系统磁盘空间 |
-| worker.alert-listen-host
| localhost | alert监听host
|
-| worker.alert-listen-port
| 50052 | alert监听端口
|
-| worker.physical-task-config.task-executor-thread-size
| 100 | Worker中任务最大并发度
|
-| worker.tenant-config.auto-create-tenant-enabled
| true |
租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。
|
-| worker.tenant-config.default-tenant-enabled
| false | 如果设置为true, 将会使用worker服务启动用户作为 `default` 租户。
|
+| 默认值 | 参数
| 描述
|
+|-----------|-----------------------------------------------------------------------------|-----------------------------------------------------------------------------------------|
+| 1234 | worker.listen-port
| worker监听端口
|
+| 10s | worker.max-heartbeat-interval
| worker最大心跳间隔
|
+| 100 | worker.host-weight
| 派发任务时,worker主机的权重
|
+| true | worker.tenant-auto-create
|
租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。
|
+| true | worker.server-load-protection.enabled
| 是否开启系统保护策略
|
+| 0.8 |
worker.server-load-protection.max-system-cpu-usage-percentage-thresholds |
worker最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,worker服务才能接收任务. 默认值为0.8:
会使用80%的操作系统CPU |
+| 0.8 |
worker.server-load-protection.max-jvm-cpu-usage-percentage-thresholds |
worker最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,worker服务才能接收任务. 默认值为0.8:
会使用80%的JVM CPU |
+| 0.8 |
worker.server-load-protection.max-system-memory-usage-percentage-thresholds |
worker最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统内存
|
+| 0.8 |
worker.server-load-protection.max-disk-usage-percentage-thresholds |
worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.8: 会使用80%的操作系统磁盘空间
|
+| 0m | master.server-load-protection.max-workflow-instance-runtime
| 一个工作流实例最大的运行时间,如果超过这个时间,实例会被kill。 默认值为 0d 表示没有限制, 最小值为1分钟。
|
+| 0m | master.server-load-protection.max-task-instance-runtime
| 一个任务实例最大的运行时间,如果超过这个时间,实例会被kill。 默认值为 0d 表示没有限制, 最小值为1分钟。
|
+| localhost | worker.alert-listen-host
| alert监听host
|
+| 50052 | worker.alert-listen-port
| alert监听端口
|
+| 100 | worker.physical-task-config.task-executor-thread-size
| Worker中任务最大并发度
|
+| true | worker.tenant-config.auto-create-tenant-enabled
|
租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。
|
+| false | worker.tenant-config.default-tenant-enabled
| 如果设置为true, 将会使用worker服务启动用户作为 `default` 租户。
|
## Alert Server相关配置
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index b486b38092..4d2ecc3713 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -114,6 +114,7 @@ public class MasterConfig implements Validator {
if (StringUtils.isEmpty(masterConfig.getMasterAddress())) {
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
+ serverLoadProtection.validate(errors);
commandFetchStrategy.validate(errors);
workerLoadBalancerConfigurationProperties.validate(errors);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java
index c1cda12ce0..0a11b8bfb8 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterServerLoadProtectionConfig.java
@@ -19,13 +19,36 @@ package org.apache.dolphinscheduler.server.master.config;
import
org.apache.dolphinscheduler.meter.metrics.BaseServerLoadProtectionConfig;
+import java.time.Duration;
+
import lombok.Data;
import lombok.EqualsAndHashCode;
+import org.springframework.validation.Errors;
+
@Data
@EqualsAndHashCode(callSuper = true)
public class MasterServerLoadProtectionConfig extends
BaseServerLoadProtectionConfig {
private int maxConcurrentWorkflowInstances = Integer.MAX_VALUE;
+ private Duration maxWorkflowInstanceRuntime = Duration.ofDays(0);
+
+ private Duration maxTaskInstanceRuntime = Duration.ofDays(0);
+
+ public void validate(Errors errors) {
+ if (maxConcurrentWorkflowInstances <= 0) {
+ errors.rejectValue("maxConcurrentWorkflowInstances", null,
+ "maxConcurrentWorkflowInstances must be greater than 0");
+ }
+ if (!maxWorkflowInstanceRuntime.isZero() &&
+ maxWorkflowInstanceRuntime.compareTo(Duration.ofMinutes(1)) <
0) {
+ errors.rejectValue("maxWorkflowInstanceRuntime", null,
+ "maxWorkflowInstanceRuntime must be 0 (disabled) or >=
1m");
+ }
+ if (!maxTaskInstanceRuntime.isZero() &&
+ maxTaskInstanceRuntime.compareTo(Duration.ofMinutes(1)) < 0) {
+ errors.rejectValue("maxTaskInstanceRuntime", null,
"maxTaskInstanceRuntime must be 0 (disabled) or >= 1m");
+ }
+ }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java
index d6c7348b7f..5ce6d109a6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java
@@ -19,8 +19,8 @@ package
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event;
import static com.google.common.base.Preconditions.checkState;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
@@ -35,23 +35,27 @@ public class TaskTimeoutLifecycleEvent extends
AbstractTaskLifecycleEvent {
private final ITaskExecutionRunnable taskExecutionRunnable;
+ private final TaskTimeoutStrategy timeoutStrategy;
+
protected TaskTimeoutLifecycleEvent(final ITaskExecutionRunnable
taskExecutionRunnable,
+ final TaskTimeoutStrategy
timeoutStrategy,
final long timeout) {
super(timeout);
+ this.timeoutStrategy = timeoutStrategy;
this.taskExecutionRunnable = taskExecutionRunnable;
}
- public static TaskTimeoutLifecycleEvent of(final ITaskExecutionRunnable
taskExecutionRunnable) {
- final TaskDefinition taskDefinition =
taskExecutionRunnable.getTaskDefinition();
+ public static TaskTimeoutLifecycleEvent of(final ITaskExecutionRunnable
taskExecutionRunnable,
+ final TaskTimeoutStrategy
timeoutStrategy,
+ final long timeoutInMinutes) {
final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
- checkState(taskDefinition != null, "The task instance must be
initialized before retrying.");
- final int timeout = taskDefinition.getTimeout();
- checkState(timeout >= 0, "The task timeout: %s must >=0 minutes",
timeout);
+ checkState(timeoutStrategy != null, "The task timeoutStrategy must not
be null");
+ checkState(timeoutInMinutes >= 0, "The task timeout: %s must >=0
minutes", timeoutInMinutes);
long delayTime = System.currentTimeMillis() -
taskInstance.getSubmitTime().getTime()
- + TimeUnit.MINUTES.toMillis(timeout);
- return new TaskTimeoutLifecycleEvent(taskExecutionRunnable, delayTime);
+ + TimeUnit.MINUTES.toMillis(timeoutInMinutes);
+ return new TaskTimeoutLifecycleEvent(taskExecutionRunnable,
timeoutStrategy, delayTime);
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskStartLifecycleEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskStartLifecycleEventHandler.java
index fc86fe74b9..5173df3bfa 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskStartLifecycleEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskStartLifecycleEventHandler.java
@@ -18,6 +18,8 @@
package
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.handler;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskStartLifecycleEvent;
@@ -28,12 +30,16 @@ import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkf
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TaskStartLifecycleEventHandler extends
AbstractTaskLifecycleEventHandler<TaskStartLifecycleEvent> {
+ @Autowired
+ private MasterConfig masterConfig;
+
@Override
public void handle(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
final TaskStartLifecycleEvent taskStartLifecycleEvent) {
@@ -63,13 +69,20 @@ public class TaskStartLifecycleEventHandler extends
AbstractTaskLifecycleEventHa
private void taskTimeoutMonitor(final ITaskExecutionRunnable
taskExecutionRunnable) {
final TaskDefinition taskDefinition =
taskExecutionRunnable.getTaskDefinition();
- if (taskDefinition.getTimeout() <= 0) {
+ int taskTimeout = taskDefinition.getTimeout();
+ if (taskTimeout > 0 && taskDefinition.getTimeoutNotifyStrategy() !=
null) {
log.debug("The task {} timeout {} is invalided, so the timeout
monitor will not be started.",
taskDefinition.getName(),
taskDefinition.getTimeout());
- return;
+
taskExecutionRunnable.getWorkflowEventBus().publish(TaskTimeoutLifecycleEvent.of(
+ taskExecutionRunnable,
taskDefinition.getTimeoutNotifyStrategy(), taskTimeout));
+ }
+
+ int systemTimeout = (int)
masterConfig.getServerLoadProtection().getMaxTaskInstanceRuntime().toMinutes();
+ if (systemTimeout > 0) {
+
taskExecutionRunnable.getWorkflowEventBus().publish(TaskTimeoutLifecycleEvent.of(
+ taskExecutionRunnable, TaskTimeoutStrategy.FAILED,
systemTimeout));
}
-
taskExecutionRunnable.getWorkflowEventBus().publish(TaskTimeoutLifecycleEvent.of(taskExecutionRunnable));
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java
index 5dfb0ec839..419225fa92 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java
@@ -17,7 +17,6 @@
package
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.handler;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -55,9 +54,8 @@ public class TaskTimeoutLifecycleEventHandler extends
AbstractTaskLifecycleEvent
// The task instance is not active, means it is already finished.
return;
}
- final TaskDefinition taskDefinition =
taskExecutionRunnable.getTaskDefinition();
final String taskName = taskExecutionRunnable.getName();
- final TaskTimeoutStrategy timeoutNotifyStrategy =
taskDefinition.getTimeoutNotifyStrategy();
+ final TaskTimeoutStrategy timeoutNotifyStrategy =
taskTimeoutLifecycleEvent.getTimeoutStrategy();
if (timeoutNotifyStrategy == null) {
log.info("The task {} TimeoutStrategy is null.", taskName);
return;
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml
b/dolphinscheduler-master/src/main/resources/application.yaml
index 81c7ae3aed..29eff533c8 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -110,6 +110,10 @@ master:
max-disk-usage-percentage-thresholds: 0.8
# Master max concurrent workflow instances, when the master's workflow
instance count exceeds this value, master server will be marked as busy.
max-concurrent-workflow-instances: 2147483647
+ # Maximum allowed running time for a workflow instance. If the running
duration exceeds this value, the instance will be kill. The default value of 0d
indicates no limit, the min value is 1m.
+ max-workflow-instance-runtime: 0d
+ # Maximum allowed running time for a task instance. If the running
duration exceeds this value, the instance will be kill. The default value of 0d
indicates no limit, the min value is 1m.
+ max-task-instance-runtime: 0d
worker-group-refresh-interval: 5m
# Task dispatch timeout check (currently disabled).
# When enabled, tasks not dispatched within this duration are marked as
failed.
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 6277351c29..498fb5ecb4 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -1288,6 +1288,39 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test start a workflow which contains a dep task will be kill
by system timeout")
+ public void testStartWorkflow_withSystemTimeoutKillTask() {
+
masterConfig.getServerLoadProtection().setMaxTaskInstanceRuntime(Duration.ofMinutes(1));
+
+ final String yaml =
"/it/start/workflow_with_system_timeout_kill_task.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow =
context.getWorkflow("workflow_with_timeout_kill_task");
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofSeconds(90))
+ .untilAsserted(() -> {
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance ->
assertThat(workflowInstance.getState())
+ .isEqualTo(WorkflowExecutionStatus.STOP));
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .satisfiesExactly(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("dep_task_with_timeout_killed");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
+
@Test
@DisplayName("Test start a workflow with task depend type TASK_ONLY")
public void testStartWorkflow_withTaskOnlyStrategy() {
diff --git a/dolphinscheduler-master/src/test/resources/application.yaml
b/dolphinscheduler-master/src/test/resources/application.yaml
index f485dd0c5f..a4a50867ce 100644
--- a/dolphinscheduler-master/src/test/resources/application.yaml
+++ b/dolphinscheduler-master/src/test/resources/application.yaml
@@ -64,6 +64,10 @@ master:
max-system-memory-usage-percentage-thresholds: 0.9
# Master max disk usage , when the master's disk usage is smaller then
this value, master server can execute workflow.
max-disk-usage-percentage-thresholds: 0.9
+ # Maximum allowed running time for a workflow instance. If the running
duration exceeds this value, the instance will be kill. The default value of 0d
indicates no limit, the min value is 1m.
+ max-workflow-instance-runtime: 0m
+ # Maximum allowed running time for a task instance. If the running
duration exceeds this value, the instance will be kill. The default value of 0d
indicates no limit, the min value is 1m.
+ max-task-instance-runtime: 0m
worker-load-balancer-configuration-properties:
# RANDOM, ROUND_ROBIN, FIXED_WEIGHTED_ROUND_ROBIN,
DYNAMIC_WEIGHTED_ROUND_ROBIN
type: DYNAMIC_WEIGHTED_ROUND_ROBIN
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_system_timeout_kill_task.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_system_timeout_kill_task.yaml
new file mode 100644
index 0000000000..a3c32ed997
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_system_timeout_kill_task.yaml
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_one_fake_task_success
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+ - name: workflow_with_timeout_kill_task
+ code: 2
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single timeout task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: B
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":[],"shellScript":"if [
\"${system.project.name}\" = \"MasterIntegrationTest\" ]; then\n exit 0
\nelse\n exit 1\nfi","resourceList":[]}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: dep_task_with_timeout_killed
+ code: 2
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: DEPENDENT
+ taskParams:
'{"localParams":[],"resourceList":[],"dependence":{"checkInterval":10,"failurePolicy":"DEPENDENT_FAILURE_FAILURE","relation":"AND","dependTaskList":[{"relation":"AND","dependItemList":[{"dependentType":"DEPENDENT_ON_WORKFLOW","projectCode":1,"definitionCode":1,"depTaskCode":0,"cycle":"day","dateValue":"last1Days"}]}]}}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 2
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 2
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+