This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b12b682d3f [Improvement-17795][Master] Add dispatch timeout checking 
logic to handle cases where the worker group does not exist or no workers are 
available (#17796)
b12b682d3f is described below

commit b12b682d3fde9ea0b7af6afdb851b5da6d772832
Author: njnu-seafish <[email protected]>
AuthorDate: Mon Feb 2 21:39:59 2026 +0800

    [Improvement-17795][Master] Add dispatch timeout checking logic to handle 
cases where the worker group does not exist or no workers are available (#17796)
---
 docs/docs/en/architecture/configuration.md         |   2 +
 docs/docs/zh/architecture/configuration.md         |   2 +
 .../server/master/config/MasterConfig.java         |  15 ++
 .../server/master/config/TaskDispatchPolicy.java   |  43 ++++
 .../PhysicalTaskExecutorClientDelegator.java       |  21 +-
 .../task/dispatcher/WorkerGroupDispatcher.java     |  59 ++++-
 .../WorkerGroupDispatcherCoordinator.java          |   9 +-
 .../dispatch/NoAvailableWorkerException.java}      |  16 +-
 .../server/master/utils/ExceptionUtils.java        |  10 +
 .../src/main/resources/application.yaml            |   5 +
 .../server/master/config/MasterConfigTest.java     |  11 +
 .../WorkerGroupDispatcherCoordinatorTest.java      |  12 +-
 .../task/dispatcher/WorkerGroupDispatcherTest.java | 244 ++++++++++++++++++++-
 .../integration/cases/WorkflowStartTestCase.java   | 159 ++++++++++++++
 .../src/test/resources/application.yaml            |   5 +
 .../start/workflow_with_no_available_worker.yaml   |  61 ++++++
 .../workflow_with_worker_group_not_found.yaml      |  61 ++++++
 .../plugin/task/api/TaskExecutionContext.java      |   2 +
 18 files changed, 707 insertions(+), 30 deletions(-)

diff --git a/docs/docs/en/architecture/configuration.md 
b/docs/docs/en/architecture/configuration.md
index b0a5548e7b..27c289bcec 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -291,6 +291,8 @@ Location: `master-server/conf/application.yaml`
 | master.command-fetch-strategy.type                                          
| ID_SLOT_BASED                | The command fetch strategy, only support 
`ID_SLOT_BASED`                                                                 
                                |
 | master.command-fetch-strategy.config.id-step                                
| 1                            | The id auto incremental step of t_ds_command 
in db                                                                           
                            |
 | master.command-fetch-strategy.config.fetch-size                             
| 10                           | The number of commands fetched by master       
                                                                                
                          |
+| master.task-dispatch-policy.dispatch-timeout-enabled                        
| false                        | Indicates whether the dispatch timeout 
checking mechanism is enabled                                                   
                                  |
+| master.task-dispatch-policy.max-task-dispatch-duration                      
| 1h                           | The maximum allowed duration a task may wait 
in the dispatch queue before being assigned to a worker                         
                            |
 
 ### Worker Server related configuration
 
diff --git a/docs/docs/zh/architecture/configuration.md 
b/docs/docs/zh/architecture/configuration.md
index 0c4973d32c..80fed042f1 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -298,6 +298,8 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
 | master.command-fetch-strategy.type                                          
| ID_SLOT_BASED                | Command拉取策略, 目前仅支持 `ID_SLOT_BASED`             
                                         |
 | master.command-fetch-strategy.config.id-step                                
| 1                            | 数据库中t_ds_command的id自增步长                        
                                         |
 | master.command-fetch-strategy.config.fetch-size                             
| 10                           | master拉取command数量                              
                                         |
+| master.task-dispatch-policy.dispatch-timeout-enabled                        
| false                        | 是否开启master分派超时检测功能                             
                                         |
+| master.task-dispatch-policy.max-task-dispatch-duration                      
| 1h                           | master分派检测的超时时长,默认为一小时                         
                                         |
 
 ## Worker Server相关配置
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 220062bd57..b486b38092 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -74,6 +74,8 @@ public class MasterConfig implements Validator {
      */
     private String masterRegistryPath;
 
+    private TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+
     @Override
     public boolean supports(Class<?> clazz) {
         return MasterConfig.class.isAssignableFrom(clazz);
@@ -97,6 +99,18 @@ public class MasterConfig implements Validator {
         if (masterConfig.getWorkerGroupRefreshInterval().getSeconds() < 10) {
             errors.rejectValue("worker-group-refresh-interval", null, "should 
>= 10s");
         }
+
+        TaskDispatchPolicy dispatchPolicy = 
masterConfig.getTaskDispatchPolicy();
+        if (dispatchPolicy.isDispatchTimeoutEnabled()) {
+            if (dispatchPolicy.getMaxTaskDispatchDuration() == null) {
+                
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
+                        "must be specified when dispatch timeout checker is 
enabled");
+            } else if (dispatchPolicy.getMaxTaskDispatchDuration().toMillis() 
<= 0) {
+                
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
+                        "must be a positive duration (e.g., '10m', '30m', 
'1h')");
+            }
+        }
+
         if (StringUtils.isEmpty(masterConfig.getMasterAddress())) {
             
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
         }
@@ -122,6 +136,7 @@ public class MasterConfig implements Validator {
                         "\n  command-fetch-strategy: " + commandFetchStrategy +
                         "\n  worker-load-balancer-configuration-properties: "
                         + workerLoadBalancerConfigurationProperties +
+                        "\n  taskDispatchPolicy: " + taskDispatchPolicy +
                         "\n****************************Master 
Configuration**************************************";
         log.info(config);
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/TaskDispatchPolicy.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/TaskDispatchPolicy.java
new file mode 100644
index 0000000000..3c93dc5d2b
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/TaskDispatchPolicy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.config;
+
+import java.time.Duration;
+
+import lombok.Data;
+
+/**
+ * Configuration for the master's task dispatch policy.
+ * When enabled, tasks that remain in the dispatch queue longer than
+ * {@link #maxTaskDispatchDuration} will be marked as failed to prevent 
indefinite queuing.
+ */
+@Data
+public class TaskDispatchPolicy {
+
+    /**
+     * Indicates whether the dispatch timeout checking mechanism is enabled.
+     */
+    private boolean dispatchTimeoutEnabled = false;
+
+    /**
+     * The maximum allowed duration a task may wait in the dispatch queue 
before being assigned to a worker.
+     * Tasks that exceed this duration will be marked as failed.
+     * Examples: {@code "10m"}, {@code "30m"}, {@code "1h"}.
+     */
+    private Duration maxTaskDispatchDuration;
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
index f7566d3133..96331b2dca 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
@@ -24,10 +24,13 @@ import 
org.apache.dolphinscheduler.extract.base.client.Clients;
 import org.apache.dolphinscheduler.extract.base.utils.Host;
 import 
org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
 import 
org.apache.dolphinscheduler.server.master.cluster.loadbalancer.IWorkerLoadBalancer;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
 import 
org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
+import 
org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
 import 
org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorLifecycleEventReporter;
 import 
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchRequest;
 import 
org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchResponse;
@@ -55,18 +58,26 @@ public class PhysicalTaskExecutorClientDelegator implements 
ITaskExecutorClientD
     @Autowired
     private IWorkerLoadBalancer workerLoadBalancer;
 
+    @Autowired
+    private ClusterManager clusterManager;
+
     @Override
     public void dispatch(final ITaskExecutionRunnable taskExecutionRunnable) 
throws TaskDispatchException {
         final TaskExecutionContext taskExecutionContext = 
taskExecutionRunnable.getTaskExecutionContext();
         final String taskName = taskExecutionContext.getTaskName();
+        final String workerGroup = taskExecutionContext.getWorkerGroup();
+
+        // workerGroup not exist
+        if 
(!clusterManager.getWorkerClusters().containsWorkerGroup(workerGroup)) {
+            throw new WorkerGroupNotFoundException(workerGroup);
+        }
+
+        // select an available worker from the worker group; throws 
NoAvailableWorkerException if none is available.
         final String physicalTaskExecutorAddress = workerLoadBalancer
-                .select(taskExecutionContext.getWorkerGroup())
+                .select(workerGroup)
                 .map(Host::of)
                 .map(Host::getAddress)
-                .orElseThrow(() -> new TaskDispatchException(
-                        String.format("Cannot find the host to dispatch 
Task[id=%s, name=%s, workerGroup=%s]",
-                                taskExecutionContext.getTaskInstanceId(), 
taskName,
-                                taskExecutionContext.getWorkerGroup())));
+                .orElseThrow(() -> new 
NoAvailableWorkerException(workerGroup));
 
         taskExecutionContext.setHost(physicalTaskExecutorAddress);
         
taskExecutionRunnable.getTaskInstance().setHost(physicalTaskExecutorAddress);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
index 28834e27e7..38f5533ba7 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
@@ -18,12 +18,16 @@
 package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
 
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
 import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
 import 
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
+import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
 
+import java.util.Date;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,11 +52,22 @@ public class WorkerGroupDispatcher extends BaseDaemonThread 
{
 
     private final AtomicBoolean runningFlag = new AtomicBoolean(false);
 
-    public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient 
taskExecutorClient) {
+    private final TaskDispatchPolicy taskDispatchPolicy;
+
+    private final long maxTaskDispatchMillis;
+
+    public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient 
taskExecutorClient,
+                                 TaskDispatchPolicy taskDispatchPolicy) {
         super("WorkerGroupTaskDispatcher-" + workerGroupName);
         this.taskExecutorClient = taskExecutorClient;
         this.workerGroupEventBus = new TaskDispatchableEventBus<>();
         this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
+        this.taskDispatchPolicy = taskDispatchPolicy;
+        if (taskDispatchPolicy.isDispatchTimeoutEnabled()) {
+            this.maxTaskDispatchMillis = 
taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis();
+        } else {
+            this.maxTaskDispatchMillis = 0L;
+        }
         log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
     }
 
@@ -84,26 +99,54 @@ public class WorkerGroupDispatcher extends BaseDaemonThread 
{
     }
 
     private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
+        final int taskInstanceId = taskExecutionRunnable.getId();
+        final TaskExecutionContext taskExecutionContext = 
taskExecutionRunnable.getTaskExecutionContext();
         try {
-            if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId())) 
{
+            if (!waitingDispatchTaskIds.remove(taskInstanceId)) {
                 log.info(
                         "The task: {} doesn't exist in 
waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
-                        taskExecutionRunnable.getId());
+                        taskInstanceId);
                 return;
             }
             taskExecutorClient.dispatch(taskExecutionRunnable);
-        } catch (Exception e) {
+        } catch (Exception ex) {
+            if (taskDispatchPolicy.isDispatchTimeoutEnabled()) {
+                // If a dispatch timeout occurs, the task will not be put back 
into the queue.
+                long elapsed = System.currentTimeMillis() - 
taskExecutionContext.getFirstDispatchTime();
+                if (elapsed > maxTaskDispatchMillis) {
+                    onDispatchTimeout(taskExecutionRunnable, ex, elapsed, 
maxTaskDispatchMillis);
+                    return;
+                }
+            }
+
             // If dispatch failed, will put the task back to the queue
             // The task will be dispatched after waiting time.
             // the waiting time will increase multiple of times, but will not 
exceed 60 seconds
-            long waitingTimeMills = Math.min(
+            long waitingTimeMillis = Math.min(
                     
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 
1_000L, 60_000L);
-            dispatchTask(taskExecutionRunnable, waitingTimeMills);
-            log.error("Dispatch Task: {} failed will retry after: {}/ms", 
taskExecutionRunnable.getId(),
-                    waitingTimeMills, e);
+            dispatchTask(taskExecutionRunnable, waitingTimeMillis);
+            log.warn("Dispatch Task: {} failed will retry after: {}/ms", 
taskInstanceId,
+                    waitingTimeMillis, ex);
         }
     }
 
+    /**
+     * Marks a task as permanently failed due to dispatch timeout.
+     * Once called, the task is considered permanently failed and will not be 
retried.
+     */
+    private void onDispatchTimeout(ITaskExecutionRunnable 
taskExecutionRunnable, Exception ex,
+                                   long elapsed, long timeout) {
+        String taskName = taskExecutionRunnable.getName();
+        log.error("Task: {} dispatch timeout after {}ms (limit: {}ms)",
+                taskName, elapsed, timeout, ex);
+
+        final TaskFailedLifecycleEvent taskFailedEvent = 
TaskFailedLifecycleEvent.builder()
+                .taskExecutionRunnable(taskExecutionRunnable)
+                .endTime(new Date())
+                .build();
+        taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
+    }
+
     /**
      * Adds a task to the worker group queue.
      * This method wraps the given task execution object into a priority and 
delay-based task entry and adds it to the worker group queue.
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
index a85674c6f4..086fc5359e 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
 
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 
@@ -41,8 +42,11 @@ public class WorkerGroupDispatcherCoordinator implements 
AutoCloseable {
 
     private final ConcurrentHashMap<String, WorkerGroupDispatcher> 
workerGroupDispatcherMap;
 
-    public WorkerGroupDispatcherCoordinator() {
+    private final MasterConfig masterConfig;
+
+    public WorkerGroupDispatcherCoordinator(final MasterConfig masterConfig) {
         workerGroupDispatcherMap = new ConcurrentHashMap<>();
+        this.masterConfig = masterConfig;
     }
 
     public void start() {
@@ -99,7 +103,8 @@ public class WorkerGroupDispatcherCoordinator implements 
AutoCloseable {
 
     private WorkerGroupDispatcher getOrCreateWorkerGroupDispatcher(String 
workerGroup) {
         return workerGroupDispatcherMap.computeIfAbsent(workerGroup, wg -> {
-            WorkerGroupDispatcher workerGroupDispatcher = new 
WorkerGroupDispatcher(wg, taskExecutorClient);
+            WorkerGroupDispatcher workerGroupDispatcher =
+                    new WorkerGroupDispatcher(wg, taskExecutorClient, 
masterConfig.getTaskDispatchPolicy());
             workerGroupDispatcher.start();
             return workerGroupDispatcher;
         });
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/NoAvailableWorkerException.java
similarity index 59%
copy from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/NoAvailableWorkerException.java
index 07156b58ab..d51e2342cc 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/NoAvailableWorkerException.java
@@ -15,19 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.utils;
+package org.apache.dolphinscheduler.server.master.exception.dispatch;
 
-import 
org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
+public class NoAvailableWorkerException extends TaskDispatchException {
 
-import org.springframework.dao.DataAccessResourceFailureException;
-
-public class ExceptionUtils {
-
-    public static boolean isDatabaseConnectedFailedException(Throwable e) {
-        return e instanceof DataAccessResourceFailureException;
-    }
-
-    public static boolean isTaskExecutionContextCreateException(Throwable e) {
-        return e instanceof TaskExecutionContextCreateException;
+    public NoAvailableWorkerException(String workerGroup) {
+        super("Cannot find available worker under worker group: " + 
workerGroup);
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
index 07156b58ab..5c1233f7db 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
@@ -18,6 +18,8 @@
 package org.apache.dolphinscheduler.server.master.utils;
 
 import 
org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
+import 
org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
+import 
org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
 
 import org.springframework.dao.DataAccessResourceFailureException;
 
@@ -30,4 +32,12 @@ public class ExceptionUtils {
     public static boolean isTaskExecutionContextCreateException(Throwable e) {
         return e instanceof TaskExecutionContextCreateException;
     }
+
+    public static boolean isWorkerGroupNotFoundException(Throwable e) {
+        return e instanceof WorkerGroupNotFoundException;
+    }
+
+    public static boolean isNoAvailableWorkerException(Throwable e) {
+        return e instanceof NoAvailableWorkerException;
+    }
 }
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml 
b/dolphinscheduler-master/src/main/resources/application.yaml
index 39a0f4311a..81c7ae3aed 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -111,6 +111,11 @@ master:
     # Master max concurrent workflow instances, when the master's workflow 
instance count exceeds this value, master server will be marked as busy.
     max-concurrent-workflow-instances: 2147483647
   worker-group-refresh-interval: 5m
+  # Task dispatch timeout check (currently disabled).
+  # When enabled, tasks not dispatched within this duration are marked as 
failed.
+  task-dispatch-policy:
+    dispatch-timeout-enabled: false
+    max-task-dispatch-duration: 1h
   command-fetch-strategy:
     type: ID_SLOT_BASED
     config:
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
index 991d6c249e..8ca3e7538d 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
@@ -24,6 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import 
org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerConfigurationProperties;
 import 
org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerType;
 
+import java.time.Duration;
+
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import 
org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
@@ -72,4 +74,13 @@ public class MasterConfigTest {
         
assertThat(dynamicWeightConfigProperties.getCpuUsageWeight()).isEqualTo(30);
         
assertThat(dynamicWeightConfigProperties.getTaskThreadPoolUsageWeight()).isEqualTo(30);
     }
+
+    @Test
+    public void getTaskDispatchPolicy() {
+        TaskDispatchPolicy policy = masterConfig.getTaskDispatchPolicy();
+
+        assertThat(policy).isNotNull();
+        assertThat(policy.isDispatchTimeoutEnabled()).isFalse();
+        
assertThat(policy.getMaxTaskDispatchDuration()).isEqualTo(Duration.ofHours(1));
+    }
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
index e2e96a9614..8e838b724a 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
@@ -24,25 +24,33 @@ import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.test.util.ReflectionTestUtils;
 
 @ExtendWith(MockitoExtension.class)
 class WorkerGroupDispatcherCoordinatorTest {
 
-    @InjectMocks
     private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
 
     @Mock
     private ITaskExecutorClient taskExecutorClient;
 
+    @BeforeEach
+    void setUp() {
+        MasterConfig masterConfig = new MasterConfig();
+        workerGroupDispatcherCoordinator = new 
WorkerGroupDispatcherCoordinator(masterConfig);
+        ReflectionTestUtils.setField(workerGroupDispatcherCoordinator, 
"taskExecutorClient", taskExecutorClient);
+    }
+
     @Test
     void addTaskToWorkerGroup_NewWorkerGroup_ShouldAddTask() {
         String workerGroup = "newGroup";
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
index 77525cb181..210f7c21c9 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
@@ -18,21 +18,36 @@
 package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
 
 import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
+import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
 import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
 import 
org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
+import 
org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
 
 import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -46,7 +61,9 @@ class WorkerGroupDispatcherTest {
     @BeforeEach
     void setUp() {
         taskExecutorClient = mock(ITaskExecutorClient.class);
-        dispatcher = new WorkerGroupDispatcher("TestGroup", 
taskExecutorClient);
+        final MasterConfig masterConfig = new MasterConfig();
+        dispatcher =
+                new WorkerGroupDispatcher("TestGroup", taskExecutorClient, 
masterConfig.getTaskDispatchPolicy());
     }
 
     @Test
@@ -138,4 +155,229 @@ class WorkerGroupDispatcherTest {
                 .untilAsserted(() -> verify(taskExecutorClient, 
times(2)).dispatch(taskExecutionRunnable));
     }
 
+    @Test
+    void dispatchTask_WorkerGroupNotFound_TimeoutDisabled_ShouldKeepRetrying() 
throws TaskDispatchException {
+        ITaskExecutionRunnable taskExecutionRunnable =
+                
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
+        WorkerGroupNotFoundException ex = new WorkerGroupNotFoundException("no 
worker group");
+        doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+        dispatcher.start();
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+        await()
+                .atMost(Duration.ofSeconds(3))
+                .untilAsserted(() -> {
+                    verify(taskExecutorClient, 
atLeast(2)).dispatch(taskExecutionRunnable);
+                    verify(taskExecutionRunnable.getWorkflowEventBus(), 
never())
+                            .publish(any(TaskFailedLifecycleEvent.class));
+                });
+    }
+
+    @Test
+    void 
dispatchTask_WorkerGroupNotFound_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent()
 throws TaskDispatchException {
+        TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+        taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+        taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMillis(200));
+
+        dispatcher = new WorkerGroupDispatcher("TestGroup", 
taskExecutorClient, taskDispatchPolicy);
+
+        ITaskExecutionRunnable taskExecutionRunnable =
+                
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() - 
500);
+        WorkerGroupNotFoundException ex = new 
WorkerGroupNotFoundException("worker group not found");
+        doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+        dispatcher.start();
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+        await()
+                .atMost(Duration.ofSeconds(2))
+                .untilAsserted(() -> {
+                    verify(taskExecutorClient, 
times(1)).dispatch(taskExecutionRunnable);
+                    
verify(taskExecutionRunnable.getWorkflowEventBus()).publish(
+                            argThat(event -> event instanceof 
TaskFailedLifecycleEvent &&
+                                    ((TaskFailedLifecycleEvent) event)
+                                            .getTaskExecutionRunnable() == 
taskExecutionRunnable));
+                });
+    }
+
+    @Test
+    void 
dispatchTask_WorkerGroupNotFound_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent()
 throws TaskDispatchException, InterruptedException {
+        TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+        taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+        taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMinutes(1));
+
+        dispatcher = new WorkerGroupDispatcher("TestGroup", 
taskExecutorClient, taskDispatchPolicy);
+
+        ITaskExecutionRunnable taskExecutionRunnable =
+                
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() - 
100);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            countDownLatch.countDown();
+            throw new WorkerGroupNotFoundException("Worker group 'TestGroup' 
does not exist");
+        }).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+        dispatcher.start();
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+        assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
+        verify(taskExecutionRunnable.getWorkflowEventBus(), 
never()).publish(any(TaskFailedLifecycleEvent.class));
+    }
+
+    @Test
+    void dispatchTask_NoAvailableWorker_TimeoutDisabled_ShouldKeepRetrying() 
throws TaskDispatchException {
+        ITaskExecutionRunnable taskExecutionRunnable =
+                
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
+        NoAvailableWorkerException ex = new NoAvailableWorkerException("no 
worker");
+        doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+        dispatcher.start();
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+        await()
+                .atMost(Duration.ofSeconds(3))
+                .untilAsserted(() -> {
+                    verify(taskExecutorClient, 
atLeast(2)).dispatch(taskExecutionRunnable);
+                    verify(taskExecutionRunnable.getWorkflowEventBus(), 
never())
+                            .publish(any(TaskFailedLifecycleEvent.class));
+                });
+    }
+
+    @Test
+    void 
dispatchTask_NoAvailableWorker_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent()
 throws TaskDispatchException {
+        TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+        taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+        taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMillis(200));
+
+        dispatcher = new WorkerGroupDispatcher("TestGroup", 
taskExecutorClient, taskDispatchPolicy);
+
+        ITaskExecutionRunnable taskExecutionRunnable =
+                
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() - 
500);
+        NoAvailableWorkerException ex = new NoAvailableWorkerException("no 
worker");
+        doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+        dispatcher.start();
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+        await()
+                .atMost(Duration.ofSeconds(2))
+                .untilAsserted(() -> {
+                    verify(taskExecutorClient, 
times(1)).dispatch(taskExecutionRunnable);
+                    
verify(taskExecutionRunnable.getWorkflowEventBus()).publish(
+                            argThat(event -> event instanceof 
TaskFailedLifecycleEvent &&
+                                    ((TaskFailedLifecycleEvent) event)
+                                            .getTaskExecutionRunnable() == 
taskExecutionRunnable));
+                });
+    }
+
+    @Test
+    void 
dispatchTask_NoAvailableWorker_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent()
 throws TaskDispatchException, InterruptedException {
+        TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+        taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+        taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMinutes(1));
+
+        dispatcher = new WorkerGroupDispatcher("TestGroup", 
taskExecutorClient, taskDispatchPolicy);
+
+        ITaskExecutionRunnable taskExecutionRunnable =
+                
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() - 
100);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            countDownLatch.countDown();
+            throw new NoAvailableWorkerException("no worker");
+        }).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+        dispatcher.start();
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+        assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
+        verify(taskExecutionRunnable.getWorkflowEventBus(), 
never()).publish(any(TaskFailedLifecycleEvent.class));
+    }
+
+    @Test
+    void 
dispatchTask_GenericTaskDispatchException_TimeoutDisabled_ShouldKeepRetrying() 
throws TaskDispatchException {
+        ITaskExecutionRunnable taskExecutionRunnable =
+                
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
+        TaskDispatchException ex = new TaskDispatchException("generic dispatch 
error");
+        doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+        dispatcher.start();
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+        await()
+                .atMost(Duration.ofSeconds(3))
+                .untilAsserted(() -> {
+                    verify(taskExecutorClient, 
atLeast(2)).dispatch(taskExecutionRunnable);
+                    verify(taskExecutionRunnable.getWorkflowEventBus(), 
never())
+                            .publish(any(TaskFailedLifecycleEvent.class));
+                });
+    }
+
+    @Test
+    void 
dispatchTask_GenericTaskDispatchException_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent()
 throws TaskDispatchException {
+        TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+        taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+        taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMillis(200));
+
+        dispatcher = new WorkerGroupDispatcher("TestGroup", 
taskExecutorClient, taskDispatchPolicy);
+
+        ITaskExecutionRunnable taskExecutionRunnable =
+                
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() - 
500);
+        TaskDispatchException ex = new TaskDispatchException("generic dispatch 
error");
+        doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+        dispatcher.start();
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+        await()
+                .atMost(Duration.ofSeconds(2))
+                .untilAsserted(() -> {
+                    verify(taskExecutorClient, 
times(1)).dispatch(taskExecutionRunnable);
+                    
verify(taskExecutionRunnable.getWorkflowEventBus()).publish(
+                            argThat(event -> event instanceof 
TaskFailedLifecycleEvent &&
+                                    ((TaskFailedLifecycleEvent) event)
+                                            .getTaskExecutionRunnable() == 
taskExecutionRunnable));
+                });
+    }
+
+    @Test
+    void 
dispatchTask_GenericTaskDispatchException_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent()
 throws TaskDispatchException, InterruptedException {
+        TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+        taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+        taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMinutes(1));
+
+        dispatcher = new WorkerGroupDispatcher("TestGroup", 
taskExecutorClient, taskDispatchPolicy);
+
+        ITaskExecutionRunnable taskExecutionRunnable =
+                
mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() - 
100);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            countDownLatch.countDown();
+            throw new TaskDispatchException("Generic dispatch error");
+        }).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+        dispatcher.start();
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+        assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
+        verify(taskExecutionRunnable.getWorkflowEventBus(), 
never()).publish(any(TaskFailedLifecycleEvent.class));
+    }
+
+    // Helper to mock TaskExecutionRunnable with firstDispatchTime
+    private ITaskExecutionRunnable 
mockTaskExecutionRunnableWithFirstDispatchTime(long firstDispatchTime) {
+        ITaskExecutionRunnable taskExecutionRunnable = 
mock(ITaskExecutionRunnable.class);
+        TaskInstance taskInstance = mock(TaskInstance.class);
+        WorkflowInstance workflowInstance = mock(WorkflowInstance.class);
+        WorkflowEventBus eventBus = mock(WorkflowEventBus.class);
+
+        TaskExecutionContext context = mock(TaskExecutionContext.class);
+        when(context.getFirstDispatchTime()).thenReturn(firstDispatchTime);
+
+        when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+        
when(taskExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance);
+        when(taskExecutionRunnable.getWorkflowEventBus()).thenReturn(eventBus);
+        
when(taskExecutionRunnable.getId()).thenReturn(ThreadLocalRandom.current().nextInt(1000,
 9999));
+        
when(taskExecutionRunnable.getTaskExecutionContext()).thenReturn(context);
+
+        return taskExecutionRunnable;
+    }
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 8fa8ecf68c..6277351c29 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -34,6 +34,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
 import 
org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
+import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
 import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
 import 
org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
 
@@ -1578,4 +1579,162 @@ public class WorkflowStartTestCase extends 
AbstractMasterIntegrationTestCase {
                 });
         masterContainer.assertAllResourceReleased();
     }
+
+    @Test
+    @DisplayName("Test start a workflow whose task specifies a non-existent 
worker group when dispatch timeout is enabled")
+    public void testTaskFail_with_workerGroupNotFoundAndTimeoutEnabled() {
+        TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+        taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+        taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofSeconds(10));
+        this.masterConfig.setTaskDispatchPolicy(taskDispatchPolicy);
+
+        final String yaml = 
"/it/start/workflow_with_worker_group_not_found.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofSeconds(30))
+                .untilAsserted(() -> {
+                    Assertions
+                            .assertThat(repository.queryTaskInstance(workflow))
+                            .hasSize(1)
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("A");
+                                
assertThat(taskInstance.getWorkerGroup()).isEqualTo("workerGroupNotFound");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+                            });
+
+                    Assertions
+                            
.assertThat(repository.queryWorkflowInstance(workflow))
+                            .satisfiesExactly(workflowInstance -> 
assertThat(workflowInstance.getState())
+                                    
.isEqualTo(WorkflowExecutionStatus.FAILURE));
+                });
+
+        masterContainer.assertAllResourceReleased();
+    }
+
+    @Test
+    @DisplayName("Test start a workflow whose task specifies a non-existent 
worker group when dispatch timeout is disabled")
+    public void 
testTaskRemainsSubmittedSuccess_with_workerGroupNotFoundAndTimeoutDisabled() {
+        TaskDispatchPolicy policy = new TaskDispatchPolicy();
+        policy.setDispatchTimeoutEnabled(false);
+        this.masterConfig.setTaskDispatchPolicy(policy);
+
+        final String yaml = 
"/it/start/workflow_with_worker_group_not_found.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofSeconds(30))
+                .untilAsserted(() -> {
+                    Assertions
+                            .assertThat(repository.queryTaskInstance(workflow))
+                            .hasSize(1)
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("A");
+                                
assertThat(taskInstance.getWorkerGroup()).isEqualTo("workerGroupNotFound");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUBMITTED_SUCCESS);
+                            });
+
+                    Assertions
+                            
.assertThat(repository.queryWorkflowInstance(workflow))
+                            .satisfiesExactly(workflowInstance -> 
assertThat(workflowInstance.getState())
+                                    
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION));
+
+                });
+
+        // This test intentionally leaves the workflow running, so we skip the 
resource cleanup check.
+        // masterContainer.assertAllResourceReleased();
+    }
+
+    @Test
+    @DisplayName("Test start a workflow when no available worker and dispatch 
timeout is enabled")
+    public void testTaskFail_with_noAvailableWorkerAndTimeoutEnabled() {
+        TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+        taskDispatchPolicy.setDispatchTimeoutEnabled(true);
+        taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofSeconds(10));
+        this.masterConfig.setTaskDispatchPolicy(taskDispatchPolicy);
+
+        final String yaml = "/it/start/workflow_with_no_available_worker.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofSeconds(30))
+                .untilAsserted(() -> {
+                    Assertions
+                            .assertThat(repository.queryTaskInstance(workflow))
+                            .hasSize(1)
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("A");
+                                
assertThat(taskInstance.getWorkerGroup()).isEqualTo("default");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+                            });
+
+                    Assertions
+                            
.assertThat(repository.queryWorkflowInstance(workflow))
+                            .satisfiesExactly(workflowInstance -> 
assertThat(workflowInstance.getState())
+                                    
.isEqualTo(WorkflowExecutionStatus.FAILURE));
+                });
+
+        masterContainer.assertAllResourceReleased();
+    }
+
+    @Test
+    @DisplayName("Test start a workflow when no available worker and dispatch 
timeout is disabled")
+    public void 
testTaskRemainsSubmittedSuccess_with_noAvailableWorkerAndTimeoutDisabled() {
+        TaskDispatchPolicy policy = new TaskDispatchPolicy();
+        policy.setDispatchTimeoutEnabled(false);
+        this.masterConfig.setTaskDispatchPolicy(policy);
+
+        final String yaml = "/it/start/workflow_with_no_available_worker.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition workflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(workflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofSeconds(30))
+                .untilAsserted(() -> {
+                    Assertions
+                            .assertThat(repository.queryTaskInstance(workflow))
+                            .hasSize(1)
+                            .anySatisfy(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("A");
+                                
assertThat(taskInstance.getWorkerGroup()).isEqualTo("default");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUBMITTED_SUCCESS);
+                            });
+
+                    Assertions
+                            
.assertThat(repository.queryWorkflowInstance(workflow))
+                            .satisfiesExactly(workflowInstance -> 
assertThat(workflowInstance.getState())
+                                    
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION));
+                });
+
+        // This test intentionally leaves the workflow running, so we skip the 
resource cleanup check.
+        // masterContainer.assertAllResourceReleased();
+    }
+
 }
diff --git a/dolphinscheduler-master/src/test/resources/application.yaml 
b/dolphinscheduler-master/src/test/resources/application.yaml
index 3aca1e3142..f485dd0c5f 100644
--- a/dolphinscheduler-master/src/test/resources/application.yaml
+++ b/dolphinscheduler-master/src/test/resources/application.yaml
@@ -73,6 +73,11 @@ master:
       cpu-usage-weight: 30
       task-thread-pool-usage-weight: 30
   worker-group-refresh-interval: 5m
+  # Task dispatch timeout check (currently disabled).
+  # When enabled, tasks not dispatched within this duration are marked as 
failed.
+  task-dispatch-policy:
+    dispatch-timeout-enabled: false
+    max-task-dispatch-duration: 1h
   command-fetch-strategy:
     type: ID_SLOT_BASED
     config:
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_no_available_worker.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_no_available_worker.yaml
new file mode 100644
index 0000000000..a4688200fb
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_no_available_worker.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2024-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_no_available_worker
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with single task
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+    userId: 1
+    executionType: PARALLEL
+
+tasks:
+  - name: A
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: SHELL
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+    taskExecuteType: BATCH
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_worker_group_not_found.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_worker_group_not_found.yaml
new file mode 100644
index 0000000000..abb7a05e0b
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_worker_group_not_found.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2024-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_worker_group_not_found
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with single task
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+    userId: 1
+    executionType: PARALLEL
+
+tasks:
+  - name: A
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: SHELL
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+    workerGroup: workerGroupNotFound
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+    taskExecuteType: BATCH
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index a8f46435c1..94b4afd2ab 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -126,6 +126,8 @@ public class TaskExecutionContext implements Serializable {
 
     private boolean failover;
 
+    private final long firstDispatchTime = System.currentTimeMillis();
+
     public int increaseDispatchFailTimes() {
         return ++dispatchFailTimes;
     }

Reply via email to