This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new bdec8b14f6 [Improvement-17179][Master] Remove 
GlobalTaskDispatchWaitingQueue (#17180)
bdec8b14f6 is described below

commit bdec8b14f6bfda7c4ca21d5449957c2da941b243
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed May 14 23:15:37 2025 +0800

    [Improvement-17179][Master] Remove GlobalTaskDispatchWaitingQueue (#17180)
---
 .../server/master/MasterServer.java                |   8 +-
 .../server/master/engine/WorkflowEngine.java       |   8 +-
 .../task/dispatcher/WorkerGroupDispatcher.java}    |  93 ++++++++------
 .../WorkerGroupDispatcherCoordinator.java          | 106 +++++++++++++++
 .../statemachine/TaskSubmittedStateAction.java     |  10 +-
 .../runner/GlobalTaskDispatchWaitingQueue.java     |  77 -----------
 .../GlobalTaskDispatchWaitingQueueLooper.java      | 110 ----------------
 .../runner/WorkerGroupTaskDispatcherManager.java   |  83 ------------
 .../WorkerGroupDispatcherCoordinatorTest.java      |  63 +++++++++
 .../dispatcher/WorkerGroupDispatcherTest.java}     |  91 +++++--------
 .../GlobalTaskDispatchWaitingQueueLooperTest.java  |  72 -----------
 .../runner/GlobalTaskDispatchWaitingQueueTest.java | 143 ---------------------
 .../WorkerGroupTaskDispatcherManagerTest.java      |  92 -------------
 .../task/executor/log/TaskExecutorMDCUtils.java    |   8 +-
 14 files changed, 273 insertions(+), 691 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 1f5bd646f6..f0dfdd70e2 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -38,10 +38,10 @@ import 
org.apache.dolphinscheduler.server.master.engine.WorkflowEngine;
 import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
 import 
org.apache.dolphinscheduler.server.master.engine.system.SystemEventBusFireWorker;
 import 
org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent;
+import 
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
 import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
 import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
-import 
org.apache.dolphinscheduler.server.master.runner.WorkerGroupTaskDispatcherManager;
 import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
 import org.apache.dolphinscheduler.service.ServiceConfiguration;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -101,7 +101,7 @@ public class MasterServer implements IStoppable {
     private MasterCoordinator masterCoordinator;
 
     @Autowired
-    private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
+    private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
 
     public static void main(String[] args) {
         
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
@@ -189,8 +189,8 @@ public class MasterServer implements IStoppable {
                 // close spring Context and will invoke method with 
@PreDestroy annotation to destroy beans.
                 // like 
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
                 SpringApplicationContext closedSpringContext = 
springApplicationContext;
-                WorkerGroupTaskDispatcherManager 
closeWorkerGroupTaskDispatcherManager =
-                        workerGroupTaskDispatcherManager) {
+                WorkerGroupDispatcherCoordinator 
closeWorkerGroupDispatcherCoordinator =
+                        workerGroupDispatcherCoordinator) {
 
             log.info("MasterServer is stopping, current cause : {}", cause);
         } catch (Exception e) {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
index 6639fc553b..a0029b417a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.engine;
 
 import org.apache.dolphinscheduler.server.master.engine.command.CommandEngine;
 import 
org.apache.dolphinscheduler.server.master.engine.executor.LogicTaskEngineDelegator;
-import 
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueueLooper;
+import 
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -37,7 +37,7 @@ public class WorkflowEngine implements AutoCloseable {
     private CommandEngine commandEngine;
 
     @Autowired
-    private GlobalTaskDispatchWaitingQueueLooper 
globalTaskDispatchWaitingQueueLooper;
+    private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
 
     @Autowired
     private LogicTaskEngineDelegator logicTaskEngineDelegator;
@@ -48,7 +48,7 @@ public class WorkflowEngine implements AutoCloseable {
 
         commandEngine.start();
 
-        globalTaskDispatchWaitingQueueLooper.start();
+        workerGroupDispatcherCoordinator.start();
 
         logicTaskEngineDelegator.start();
 
@@ -60,7 +60,7 @@ public class WorkflowEngine implements AutoCloseable {
         try (
                 final CommandEngine ignore1 = commandEngine;
                 final WorkflowEventBusCoordinator ignore2 = 
workflowEventBusCoordinator;
-                final GlobalTaskDispatchWaitingQueueLooper ignore3 = 
globalTaskDispatchWaitingQueueLooper;
+                final WorkerGroupDispatcherCoordinator ignore3 = 
workerGroupDispatcherCoordinator;
                 final LogicTaskEngineDelegator ignore5 = 
logicTaskEngineDelegator) {
             // closed the resource
         }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
similarity index 64%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
index 10991f068a..d061f29aa8 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner;
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
 
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
 import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
+import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
 
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.extern.slf4j.Slf4j;
@@ -37,7 +38,7 @@ import lombok.extern.slf4j.Slf4j;
  * 3. Ensuring thread safety and correct state transitions during task 
processing.
  */
 @Slf4j
-public class WorkerGroupTaskDispatcher extends BaseDaemonThread {
+public class WorkerGroupDispatcher extends BaseDaemonThread {
 
     private final ITaskExecutorClient taskExecutorClient;
 
@@ -47,26 +48,16 @@ public class WorkerGroupTaskDispatcher extends 
BaseDaemonThread {
     // If it needs to be placed at the front of the queue, the queue needs to 
be re-implemented.
     private final 
PriorityDelayQueue<PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable>> 
workerGroupQueue;
 
+    private final Set<Integer> waitingDispatchTaskIds;
+
     private final AtomicBoolean runningFlag = new AtomicBoolean(false);
 
-    public WorkerGroupTaskDispatcher(String workerGroupName, 
ITaskExecutorClient taskExecutorClient) {
+    public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient 
taskExecutorClient) {
         super("WorkerGroupTaskDispatcher-" + workerGroupName);
         this.taskExecutorClient = taskExecutorClient;
         this.workerGroupQueue = new PriorityDelayQueue<>();
-    }
-
-    /**
-     * Adds a task to the worker group queue.
-     * This method wraps the given task execution object into a priority and 
delay-based task entry and adds it to the worker group queue.
-     * The task is only added if the current dispatcher status is either 
STARTED or INIT. If the dispatcher is in any other state,
-     * the task addition will fail, and a warning message will be logged.
-     *
-     * @param taskExecutionRunnable The task execution object to add to the 
queue, which implements the {@link ITaskExecutionRunnable} interface.
-     * @param delayTimeMills The delay time in milliseconds before the task 
should be executed.
-     */
-    public void addTaskToWorkerGroupQueue(ITaskExecutionRunnable 
taskExecutionRunnable,
-                                          long delayTimeMills) {
-        workerGroupQueue.add(new 
PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
+        this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
+        log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
     }
 
     @Override
@@ -80,27 +71,25 @@ public class WorkerGroupTaskDispatcher extends 
BaseDaemonThread {
         }
     }
 
-    public synchronized void close() {
-        log.info("The {} closed called but not implemented", this.getName());
-        // todo WorkerGroupTaskDispatcher thread needs to be shut down after 
the WorkerGroup is deleted.
-    }
-
     @Override
     public void run() {
         while (runningFlag.get()) {
-            dispatch();
+            PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry = 
workerGroupQueue.take();
+            ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
+            try (
+                    TaskExecutorMDCUtils.MDCAutoClosable ignore =
+                            
TaskExecutorMDCUtils.logWithMDC(taskExecutionRunnable.getId())) {
+                doDispatchTask(taskExecutionRunnable);
+            }
         }
     }
 
-    private void dispatch() {
-        PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry = 
workerGroupQueue.take();
-        ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
-        final TaskInstance taskInstance = 
taskExecutionRunnable.getTaskInstance();
+    private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
         try {
-            final TaskExecutionStatus taskStatus = taskInstance.getState();
-            if (taskStatus != TaskExecutionStatus.SUBMITTED_SUCCESS
-                    && taskStatus != TaskExecutionStatus.DELAY_EXECUTION) {
-                log.warn("The TaskInstance {} state is : {}, will not 
dispatch", taskInstance.getName(), taskStatus);
+            if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId())) 
{
+                log.info(
+                        "The task: {} doesn't exist in 
waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
+                        taskExecutionRunnable.getId());
                 return;
             }
             taskExecutorClient.dispatch(taskExecutionRunnable);
@@ -110,16 +99,44 @@ public class WorkerGroupTaskDispatcher extends 
BaseDaemonThread {
             // the waiting time will increase multiple of times, but will not 
exceed 60 seconds
             long waitingTimeMills = Math.min(
                     
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 
1_000L, 60_000L);
-            workerGroupQueue.add(new 
PriorityAndDelayBasedTaskEntry<>(waitingTimeMills, taskExecutionRunnable));
-            log.error("Dispatch Task: {} failed will retry after: {}/ms", 
taskInstance.getName(), waitingTimeMills, e);
+            dispatchTask(taskExecutionRunnable, waitingTimeMills);
+            log.error("Dispatch Task: {} failed will retry after: {}/ms", 
taskExecutionRunnable.getId(),
+                    waitingTimeMills, e);
         }
     }
 
     /**
-     * ony use unit test
-     * @return size
+     * Adds a task to the worker group queue.
+     * This method wraps the given task execution object into a priority and 
delay-based task entry and adds it to the worker group queue.
+     * The task is only added if the current dispatcher status is either 
STARTED or INIT. If the dispatcher is in any other state,
+     * the task addition will fail, and a warning message will be logged.
+     *
+     * @param taskExecutionRunnable The task execution object to add to the 
queue, which implements the {@link ITaskExecutionRunnable} interface.
+     * @param delayTimeMills        The delay time in milliseconds before the 
task should be executed.
      */
-    protected int queueSize() {
+    public void dispatchTask(final ITaskExecutionRunnable 
taskExecutionRunnable, final long delayTimeMills) {
+        waitingDispatchTaskIds.add(taskExecutionRunnable.getId());
+        workerGroupQueue.add(new 
PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
+    }
+
+    public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
+        return waitingDispatchTaskIds.remove(taskExecutionRunnable.getId());
+    }
+
+    public boolean existTask(ITaskExecutionRunnable taskExecutionRunnable) {
+        return waitingDispatchTaskIds.contains(taskExecutionRunnable.getId());
+    }
+
+    public synchronized void close() {
+        // todo WorkerGroupTaskDispatcher thread needs to be shut down after 
the WorkerGroup is deleted.
+        if (runningFlag.compareAndSet(true, false)) {
+            log.info("WorkerGroupDispatcher {} closed", this.getName());
+        } else {
+            log.warn("The WorkerGroupDispatcher: {} doesn't started", 
this.getName());
+        }
+    }
+
+    int queueSize() {
         return this.workerGroupQueue.size();
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
new file mode 100644
index 0000000000..1683b86d5c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
+
+import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * WorkerGroupTaskDispatcherManager is responsible for managing the task 
dispatching for worker groups.
+ * It maintains a mapping of worker groups to their task dispatchers and 
priority delay queues,
+ * and supports adding tasks, starting and stopping worker groups, as well as 
cleaning up resources upon shutdown.
+ */
+@Component
+@Slf4j
+public class WorkerGroupDispatcherCoordinator implements AutoCloseable {
+
+    @Autowired
+    private ITaskExecutorClient taskExecutorClient;
+
+    private final ConcurrentHashMap<String, WorkerGroupDispatcher> 
workerGroupDispatcherMap;
+
+    public WorkerGroupDispatcherCoordinator() {
+        workerGroupDispatcherMap = new ConcurrentHashMap<>();
+    }
+
+    public void start() {
+        log.info("WorkerGroupTaskDispatcherManager started...");
+    }
+
+    /**
+     * Dispatch task to the worker group with the specified remaining time.
+     */
+    public void dispatchTask(final ITaskExecutionRunnable 
taskExecutionRunnable,
+                             final long delayTimeMills) {
+        final String workerGroup = 
taskExecutionRunnable.getTaskInstance().getWorkerGroup();
+        
getOrCreateWorkerGroupDispatcher(workerGroup).dispatchTask(taskExecutionRunnable,
 delayTimeMills);
+        log.info("Success add Task: {} to WorkerGroupDispatcher: {}", 
taskExecutionRunnable.getId(), workerGroup);
+    }
+
+    /**
+     * Remove task from the dispatcher.
+     * <p> If the task doesn't exist in the dispatcher, it will return false, 
this means the task might already be dispatched.
+     */
+    public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
+        final String workerGroup = 
taskExecutionRunnable.getTaskInstance().getWorkerGroup();
+        boolean removed = 
getOrCreateWorkerGroupDispatcher(workerGroup).removeTask(taskExecutionRunnable);
+        if (removed) {
+            log.info("Success removed Task: {} from WorkerGroupDispatcher: {}",
+                    taskExecutionRunnable.getId(), workerGroup);
+        } else {
+            log.info("Failed to remove Task: {} from WorkerGroupDispatcher: 
{}, this task has been dispatched",
+                    taskExecutionRunnable.getId(), workerGroup);
+        }
+        return removed;
+    }
+
+    public boolean existWorkerGroup(String workerGroup) {
+        return workerGroupDispatcherMap.containsKey(workerGroup);
+    }
+
+    /**
+     * Stop all workerGroupTaskDispatchWaitingQueueLooper
+     */
+    @Override
+    public void close() throws Exception {
+        log.info("WorkerGroupDispatcherCoordinator closing");
+        for (WorkerGroupDispatcher workerGroupDispatcher : 
workerGroupDispatcherMap.values()) {
+            try {
+                workerGroupDispatcher.close();
+            } catch (Exception e) {
+                log.error("close WorkerGroupDispatcher: {} error", 
workerGroupDispatcher.getName(), e);
+            }
+        }
+        log.info("WorkerGroupDispatcherCoordinator closed...");
+    }
+
+    private WorkerGroupDispatcher getOrCreateWorkerGroupDispatcher(String 
workerGroup) {
+        return workerGroupDispatcherMap.computeIfAbsent(workerGroup, wg -> {
+            WorkerGroupDispatcher workerGroupDispatcher = new 
WorkerGroupDispatcher(wg, taskExecutorClient);
+            workerGroupDispatcher.start();
+            return workerGroupDispatcher;
+        });
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
index cc82c1030c..e3a46f9f71 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import 
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
@@ -35,7 +36,6 @@ import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.Tas
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -50,7 +50,7 @@ import org.springframework.stereotype.Component;
 public class TaskSubmittedStateAction extends AbstractTaskStateAction {
 
     @Autowired
-    private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
+    private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
 
     @Autowired
     private TaskInstanceDao taskInstanceDao;
@@ -107,7 +107,7 @@ public class TaskSubmittedStateAction extends 
AbstractTaskStateAction {
                     taskInstance.getDelayTime(),
                     remainTimeMills);
         }
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
 remainTimeMills);
+        workerGroupDispatcherCoordinator.dispatchTask(taskExecutionRunnable, 
remainTimeMills);
     }
 
     @Override
@@ -123,7 +123,7 @@ public class TaskSubmittedStateAction extends 
AbstractTaskStateAction {
                                  final ITaskExecutionRunnable 
taskExecutionRunnable,
                                  final TaskPauseLifecycleEvent taskPauseEvent) 
{
         throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
-        if 
(globalTaskDispatchWaitingQueue.markTaskExecutionRunnableRemoved(taskExecutionRunnable))
 {
+        if 
(workerGroupDispatcherCoordinator.removeTask(taskExecutionRunnable)) {
             log.info("Success pause task: {} before dispatch", 
taskExecutionRunnable.getName());
             
taskExecutionRunnable.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecutionRunnable));
             return;
@@ -144,7 +144,7 @@ public class TaskSubmittedStateAction extends 
AbstractTaskStateAction {
                                 final ITaskExecutionRunnable 
taskExecutionRunnable,
                                 final TaskKillLifecycleEvent taskKillEvent) {
         throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
-        if 
(globalTaskDispatchWaitingQueue.markTaskExecutionRunnableRemoved(taskExecutionRunnable))
 {
+        if 
(workerGroupDispatcherCoordinator.removeTask(taskExecutionRunnable)) {
             log.info("Success kill task: {} before dispatch", 
taskExecutionRunnable.getName());
             
taskExecutionRunnable.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecutionRunnable));
             return;
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
deleted file mode 100644
index 2f07be37b4..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.queue.TimeBasedTaskExecutionRunnableComparableEntry;
-
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.PriorityBlockingQueue;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.stereotype.Component;
-
-/**
- * The class is used to store {@link ITaskExecutionRunnable} which needs to be 
dispatched. The {@link ITaskExecutionRunnable}
- * will be stored in {@link DelayQueue}, if the {@link 
ITaskExecutionRunnable}'s delay time is 0, then it will be
- * consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
- * <p>
- * The order of {@link ITaskExecutionRunnable} in the {@link DelayQueue} is 
determined by {@link ITaskExecutionRunnable#compareTo}.
- */
-@Slf4j
-@Component
-public class GlobalTaskDispatchWaitingQueue {
-
-    private final Set<Integer> waitingTaskInstanceIds = 
ConcurrentHashMap.newKeySet();
-
-    private final DelayQueue<TimeBasedTaskExecutionRunnableComparableEntry> 
delayQueue =
-            new DelayQueue<>();
-
-    /**
-     * Submit a {@link ITaskExecutionRunnable} with delay time, if the delay 
time <= 0 then it can be consumed.
-     */
-    public synchronized void 
dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable 
taskExecutionRunnable,
-                                                                  long 
delayTimeMills) {
-        
waitingTaskInstanceIds.add(taskExecutionRunnable.getTaskInstance().getId());
-        delayQueue.add(new 
TimeBasedTaskExecutionRunnableComparableEntry(delayTimeMills, 
taskExecutionRunnable));
-    }
-
-    /**
-     * Consume {@link ITaskExecutionRunnable} from the {@link 
PriorityBlockingQueue}, only the delay time <= 0 can be consumed.
-     */
-    @SneakyThrows
-    public ITaskExecutionRunnable takeTaskExecuteRunnable() {
-        ITaskExecutionRunnable taskExecutionRunnable = 
(ITaskExecutionRunnable) delayQueue.take().getData();
-        while (!markTaskExecutionRunnableRemoved(taskExecutionRunnable)) {
-            taskExecutionRunnable = (ITaskExecutionRunnable) 
delayQueue.take().getData();
-        }
-        return taskExecutionRunnable;
-    }
-
-    public int getWaitingDispatchTaskNumber() {
-        return waitingTaskInstanceIds.size();
-    }
-
-    public synchronized boolean 
markTaskExecutionRunnableRemoved(ITaskExecutionRunnable taskExecutionRunnable) {
-        return 
waitingTaskInstanceIds.remove(taskExecutionRunnable.getTaskInstance().getId());
-    }
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
deleted file mode 100644
index 33a9b51b8d..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread 
implements AutoCloseable {
-
-    @Autowired
-    private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
-
-    @Autowired
-    private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
-
-    private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
-
-    public GlobalTaskDispatchWaitingQueueLooper() {
-        super("GlobalTaskDispatchWaitingQueueLooper");
-    }
-
-    @Override
-    public synchronized void start() {
-        if (!RUNNING_FLAG.compareAndSet(false, true)) {
-            log.error("The GlobalTaskDispatchWaitingQueueLooper already 
started, will not start again");
-            return;
-        }
-        log.info("GlobalTaskDispatchWaitingQueueLooper starting...");
-        super.start();
-        log.info("GlobalTaskDispatchWaitingQueueLooper started...");
-    }
-
-    @Override
-    public void run() {
-        while (RUNNING_FLAG.get()) {
-            doDispatch();
-        }
-    }
-
-    void doDispatch() {
-        final ITaskExecutionRunnable taskExecutionRunnable = 
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
-        final TaskInstance taskInstance = 
taskExecutionRunnable.getTaskInstance();
-        try {
-            final TaskExecutionStatus status = taskInstance.getState();
-            if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status != 
TaskExecutionStatus.DELAY_EXECUTION) {
-                log.warn("The TaskInstance {} state is : {}, will not 
dispatch", taskInstance.getName(), status);
-                return;
-            }
-            this.dispatchTaskToWorkerGroup(taskExecutionRunnable);
-        } catch (Exception e) {
-            this.delayRetryDispatch(taskExecutionRunnable, e);
-        }
-    }
-
-    private void delayRetryDispatch(ITaskExecutionRunnable 
taskExecutionRunnable, Exception e) {
-        // If dispatch failed, will put the task back to the queue
-        // The task will be dispatched after waiting time.
-        // the waiting time will increase multiple of times, but will not 
exceed 60 seconds
-        long waitingTimeMills = Math.min(
-                
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 
1_000L, 60_000L);
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
-                waitingTimeMills);
-        log.error("Dispatch Task: {} failed will retry after: {}/ms", 
taskExecutionRunnable.getTaskInstance().getName(),
-                waitingTimeMills, e);
-    }
-
-    private void dispatchTaskToWorkerGroup(ITaskExecutionRunnable 
taskExecutionRunnable) {
-        workerGroupTaskDispatcherManager.addTaskToWorkerGroup(
-                taskExecutionRunnable.getTaskInstance().getWorkerGroup(),
-                taskExecutionRunnable, 0);
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (RUNNING_FLAG.compareAndSet(true, false)) {
-            log.info("GlobalTaskDispatchWaitingQueueLooper stopping...");
-            workerGroupTaskDispatcherManager.close();
-            log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");
-        } else {
-            log.error("GlobalTaskDispatchWaitingQueueLooper is not started");
-        }
-    }
-
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
deleted file mode 100644
index 456f2cd94c..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
-import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * WorkerGroupTaskDispatcherManager is responsible for managing the task 
dispatching for worker groups.
- * It maintains a mapping of worker groups to their task dispatchers and 
priority delay queues,
- * and supports adding tasks, starting and stopping worker groups, as well as 
cleaning up resources upon shutdown.
- */
-@Component
-@Slf4j
-public class WorkerGroupTaskDispatcherManager implements AutoCloseable {
-
-    @Autowired
-    private ITaskExecutorClient taskExecutorClient;
-
-    @Getter
-    private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher> 
dispatchWorkerMap;
-
-    public WorkerGroupTaskDispatcherManager() {
-        dispatchWorkerMap = new ConcurrentHashMap<>();
-    }
-
-    /**
-     * Adds a task to the specified worker group queue and starts or wakes up 
the corresponding processing loop.
-     *
-     * @param workerGroup the identifier for the worker group, used to 
distinguish different task queues
-     * @param taskExecutionRunnable an instance of ITaskExecutionRunnable 
representing the task to be executed
-     * @param delayTimeMills the delay time before the task is executed, in 
milliseconds
-     */
-    public synchronized void addTaskToWorkerGroup(String workerGroup, 
ITaskExecutionRunnable taskExecutionRunnable,
-                                                  long delayTimeMills) {
-        WorkerGroupTaskDispatcher workerGroupTaskDispatcher = 
dispatchWorkerMap.computeIfAbsent(
-                workerGroup, key -> new WorkerGroupTaskDispatcher(workerGroup, 
taskExecutorClient));
-        if (!workerGroupTaskDispatcher.isAlive()) {
-            workerGroupTaskDispatcher.start();
-        }
-        
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 
delayTimeMills);
-    }
-
-    /**
-     * Stop all workerGroupTaskDispatchWaitingQueueLooper
-     */
-    @Override
-    public void close() throws Exception {
-        log.info("WorkerGroupTaskDispatcherManager start close");
-        for (Map.Entry<String, WorkerGroupTaskDispatcher> entry : 
dispatchWorkerMap.entrySet()) {
-            try {
-                entry.getValue().close();
-            } catch (Exception e) {
-                log.error("close worker group error", e);
-            }
-        }
-        log.info("WorkerGroupTaskDispatcherManager closed");
-    }
-}
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
new file mode 100644
index 0000000000..9d39de3a20
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class WorkerGroupDispatcherCoordinatorTest {
+
+    @InjectMocks
+    private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
+
+    @Mock
+    private ITaskExecutorClient taskExecutorClient;
+
+    @Test
+    void addTaskToWorkerGroup_NewWorkerGroup_ShouldAddTask() {
+        String workerGroup = "newGroup";
+        long delayTimeMills = 1000;
+
+        ITaskExecutionRunnable taskExecutionRunnable = 
Mockito.mock(ITaskExecutionRunnable.class);
+        TaskInstance taskInstance = mock(TaskInstance.class);
+
+        when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+        when(taskInstance.getWorkerGroup()).thenReturn(workerGroup);
+
+        
assertFalse(workerGroupDispatcherCoordinator.existWorkerGroup(workerGroup));
+
+        workerGroupDispatcherCoordinator.dispatchTask(taskExecutionRunnable, 
delayTimeMills);
+
+        
assertTrue(workerGroupDispatcherCoordinator.existWorkerGroup(workerGroup));
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
similarity index 53%
rename from 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
rename to 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
index a4be0d4213..7f5b0f4c8c 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
@@ -15,102 +15,77 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner;
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
 
-import static org.apache.dolphinscheduler.common.thread.ThreadUtils.sleep;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.awaitility.Awaitility.await;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
 
 import java.time.Duration;
 
-import org.awaitility.Awaitility;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-class WorkerGroupTaskDispatcherTest {
+class WorkerGroupDispatcherTest {
 
-    private WorkerGroupTaskDispatcher dispatcher;
+    private WorkerGroupDispatcher dispatcher;
     private ITaskExecutorClient taskExecutorClient;
 
     @BeforeEach
     void setUp() {
         taskExecutorClient = mock(ITaskExecutorClient.class);
-        dispatcher = new WorkerGroupTaskDispatcher("TestGroup", 
taskExecutorClient);
+        dispatcher = new WorkerGroupDispatcher("TestGroup", 
taskExecutorClient);
     }
 
     @Test
-    void addTaskToWorkerGroupQueue_StatusAllowed_TaskAdded() {
-        // Arrange
+    void dispatchTask() {
         ITaskExecutionRunnable taskExecutionRunnable = 
mock(ITaskExecutionRunnable.class);
         TaskInstance taskInstance = mock(TaskInstance.class);
         when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
-        
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
         dispatcher.start();
 
-        // Act
-        dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
-
-        // Assert
-        assertFalse(dispatcher.queueSize() == 0);
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+        await()
+                .atMost(Duration.ofSeconds(5))
+                .untilAsserted(() -> verify(taskExecutorClient, 
times(1)).dispatch(taskExecutionRunnable));
     }
 
     @Test
-    void addTaskToWorkerGroupQueue_StatusNotAllowed_TaskNotAdded() {
-        // Arrange
+    void dispatchTask_withDelay() {
         ITaskExecutionRunnable taskExecutionRunnable = 
mock(ITaskExecutionRunnable.class);
         TaskInstance taskInstance = mock(TaskInstance.class);
         when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
-        
doReturn(TaskExecutionStatus.RUNNING_EXECUTION).when(taskInstance).getState();
-
-        // Act
-        dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
-
-        // Assert
-        assertTrue(dispatcher.queueSize() > 0);
-    }
-
-    @Test
-    void start_DispatcherStartsSuccessfully() {
-        // Act
         dispatcher.start();
 
-        // Assert
-        assertTrue(dispatcher.isAlive());
+        dispatcher.dispatchTask(taskExecutionRunnable, 2000);
+        await()
+                .atLeast(Duration.ofMillis(1500))
+                .untilAsserted(() -> verify(taskExecutorClient, 
times(1)).dispatch(taskExecutionRunnable));
     }
 
     @Test
-    void dispatch_TaskDispatchedSuccessfully() throws TaskDispatchException {
-        // Arrange
+    void dispatchTask_HasBeenRemoved() {
         ITaskExecutionRunnable taskExecutionRunnable = 
mock(ITaskExecutionRunnable.class);
         TaskInstance taskInstance = mock(TaskInstance.class);
         when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
-        
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
-        doNothing().when(taskExecutorClient).dispatch(any());
-        dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
 
-        // Act
-        dispatcher.start();
-        sleep(100); // Give some time for the dispatcher to run
-        dispatcher.close();
-        Awaitility.await()
-                .atMost(Duration.ofSeconds(1)).untilAsserted(
-                        () -> verify(taskExecutorClient, 
atLeastOnce()).dispatch(taskExecutionRunnable));
-        // Assert
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+        dispatcher.removeTask(taskExecutionRunnable);
 
+        dispatcher.start();
+        await()
+                .pollDelay(Duration.ofSeconds(2))
+                .untilAsserted(() -> verify(taskExecutorClient, 
times(0)).dispatch(taskExecutionRunnable));
     }
 
     @Test
@@ -119,18 +94,16 @@ class WorkerGroupTaskDispatcherTest {
         ITaskExecutionRunnable taskExecutionRunnable = 
mock(ITaskExecutionRunnable.class);
         TaskInstance taskInstance = mock(TaskInstance.class);
         when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
-        
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
-        doThrow(new 
RuntimeException()).when(taskExecutorClient).dispatch(any());
-        dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
+        when(taskExecutionRunnable.getTaskExecutionContext()).thenReturn(new 
TaskExecutionContext());
 
-        // Act
+        doThrow(new 
RuntimeException()).when(taskExecutorClient).dispatch(any());
         dispatcher.start();
-        sleep(100); // Give some time for the dispatcher to run
-        dispatcher.close();
 
-        // Assert
-        Awaitility.await()
-                .atMost(Duration.ofSeconds(1)).untilAsserted(
-                        () -> verify(taskExecutorClient, 
atLeastOnce()).dispatch(any()));
+        dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+        await()
+                .pollDelay(Duration.ofSeconds(2))
+                .untilAsserted(() -> verify(taskExecutorClient, 
times(2)).dispatch(taskExecutionRunnable));
     }
+
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
deleted file mode 100644
index 0cc5571f01..0000000000
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.LENIENT)
-class GlobalTaskDispatchWaitingQueueLooperTest {
-
-    @InjectMocks
-    private GlobalTaskDispatchWaitingQueueLooper 
globalTaskDispatchWaitingQueueLooper;
-
-    @Mock
-    private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
-
-    @Mock
-    private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
-
-    @Mock
-    private ITaskExecutionRunnable taskExecutionRunnable;
-
-    @Test
-    void testTaskExecutionRunnableStatusIsSubmittedNoWorkerGroup() {
-
-        
when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(taskExecutionRunnable);
-        TaskInstance taskInstance = mock(TaskInstance.class);
-        
when(taskInstance.getState()).thenReturn(TaskExecutionStatus.SUBMITTED_SUCCESS);
-        when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
-        
when(taskExecutionRunnable.getTaskExecutionContext()).thenReturn(mock(TaskExecutionContext.class));
-        globalTaskDispatchWaitingQueueLooper.doDispatch();
-
-        verify(workerGroupTaskDispatcherManager, 
times(1)).addTaskToWorkerGroup(any(),
-                any(ITaskExecutionRunnable.class),
-                anyLong());
-
-    }
-
-}
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
deleted file mode 100644
index 209c1b4376..0000000000
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
-import 
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
-import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
-import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
-
-import org.apache.commons.lang3.RandomUtils;
-
-import java.time.Duration;
-import java.util.Date;
-import java.util.concurrent.CompletableFuture;
-
-import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionTimeoutException;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.springframework.context.ApplicationContext;
-
-@ExtendWith(MockitoExtension.class)
-class GlobalTaskDispatchWaitingQueueTest {
-
-    @InjectMocks
-    private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
-
-    @Test
-    void submitTaskExecuteRunnable() {
-        ITaskExecutionRunnable iTaskExecutionRunnable = 
createTaskExecuteRunnable();
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable,
 500);
-        Awaitility.await()
-                .atMost(Duration.ofSeconds(1))
-                .untilAsserted(
-                        () -> 
Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()));
-    }
-
-    @Test
-    void testSubmitTaskExecuteRunnableWithDelay() {
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(createTaskExecuteRunnable(),
 3_000L);
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(createTaskExecuteRunnable(),
 500);
-
-        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull();
-        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull();
-    }
-
-    @Test
-    void takeTaskExecuteRunnable_NoElementShouldBlock() {
-        CompletableFuture<Void> completableFuture =
-                CompletableFuture.runAsync(() -> 
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable());
-        assertThrowsExactly(ConditionTimeoutException.class,
-                () -> await()
-                        .atLeast(Duration.ofSeconds(2))
-                        .timeout(Duration.ofSeconds(3))
-                        .until(completableFuture::isDone));
-    }
-
-    @Test
-    void takeTaskExecuteRunnable_withDifferentTaskInstanceDelay() {
-        ITaskExecutionRunnable taskExecutionRunnable1 = 
createTaskExecuteRunnable();
-        taskExecutionRunnable1.getTaskInstance().setId(1);
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable1,
 0);
-
-        ITaskExecutionRunnable iTaskExecutionRunnable2 = 
createTaskExecuteRunnable();
-        iTaskExecutionRunnable2.getTaskInstance().setId(2);
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable2,
 1);
-
-        ITaskExecutionRunnable iTaskExecutionRunnable3 = 
createTaskExecuteRunnable();
-        iTaskExecutionRunnable3.getTaskInstance().setId(3);
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable3,
 2);
-
-        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
-                .isEqualTo(1);
-        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
-                .isEqualTo(2);
-        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
-                .isEqualTo(3);
-    }
-
-    @Test
-    void getWaitingDispatchTaskNumber() {
-        Assertions.assertEquals(0, 
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
-        ITaskExecutionRunnable iTaskExecutionRunnable = 
createTaskExecuteRunnable();
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable,
 500);
-        Assertions.assertEquals(1, 
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
-    }
-
-    private ITaskExecutionRunnable createTaskExecuteRunnable() {
-        WorkflowInstance workflowInstance = new WorkflowInstance();
-        workflowInstance.setWorkflowInstancePriority(Priority.MEDIUM);
-
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(RandomUtils.nextInt());
-        taskInstance.setTaskInstancePriority(Priority.MEDIUM);
-        taskInstance.setFirstSubmitTime(new Date());
-
-        final ApplicationContext applicationContext = 
mock(ApplicationContext.class);
-        when(applicationContext.getBean(TaskExecutionContextFactory.class))
-                .thenReturn(mock(TaskExecutionContextFactory.class));
-        final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = 
TaskExecutionRunnableBuilder.builder()
-                .applicationContext(applicationContext)
-                .workflowInstance(workflowInstance)
-                .taskInstance(taskInstance)
-                .workflowExecutionGraph(new WorkflowExecutionGraph())
-                .workflowDefinition(new WorkflowDefinition())
-                .project(new Project())
-                .taskDefinition(new TaskDefinition())
-                .workflowEventBus(new WorkflowEventBus())
-                .build();
-        return new TaskExecutionRunnable(taskExecutionRunnableBuilder);
-    }
-}
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
deleted file mode 100644
index 6bfe96e4b5..0000000000
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.springframework.test.util.ReflectionTestUtils;
-
-@ExtendWith(MockitoExtension.class)
-class WorkerGroupTaskDispatcherManagerTest {
-
-    @InjectMocks
-    private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
-
-    @Mock
-    private ITaskExecutionRunnable taskExecutionRunnable;
-
-    @Test
-    void addTaskToWorkerGroup_NewWorkerGroup_ShouldAddTask() {
-        String workerGroup = "newGroup";
-        long delayTimeMills = 1000;
-
-        workerGroupTaskDispatcherManager.addTaskToWorkerGroup(workerGroup, 
taskExecutionRunnable, delayTimeMills);
-
-        ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap 
=
-                workerGroupTaskDispatcherManager.getDispatchWorkerMap();
-
-        assertTrue(dispatchWorkerMap.containsKey(workerGroup));
-    }
-
-    @Test
-    void addTaskToWorkerGroup_ExistingWorkerGroup_ShouldAddTask() {
-        String workerGroup = "existingGroup";
-        long delayTimeMills = 1000;
-
-        WorkerGroupTaskDispatcher mockDispatcher = 
mock(WorkerGroupTaskDispatcher.class);
-
-        ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap 
= new ConcurrentHashMap<>();
-        dispatchWorkerMap.put(workerGroup, mockDispatcher);
-
-        ReflectionTestUtils.setField(workerGroupTaskDispatcherManager, 
"dispatchWorkerMap", dispatchWorkerMap);
-        doNothing().when(mockDispatcher).start();
-        workerGroupTaskDispatcherManager.addTaskToWorkerGroup(workerGroup, 
taskExecutionRunnable, delayTimeMills);
-
-        verify(mockDispatcher, 
times(1)).addTaskToWorkerGroupQueue(taskExecutionRunnable, delayTimeMills);
-    }
-
-    @Test
-    void close_ShouldCloseAllWorkerGroups() throws Exception {
-        WorkerGroupTaskDispatcher mockDispatcher1 = 
mock(WorkerGroupTaskDispatcher.class);
-        WorkerGroupTaskDispatcher mockDispatcher2 = 
mock(WorkerGroupTaskDispatcher.class);
-
-        ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap 
= new ConcurrentHashMap<>();
-        dispatchWorkerMap.put("group1", mockDispatcher1);
-        dispatchWorkerMap.put("group2", mockDispatcher2);
-
-        ReflectionTestUtils.setField(workerGroupTaskDispatcherManager, 
"dispatchWorkerMap", dispatchWorkerMap);
-
-        workerGroupTaskDispatcherManager.close();
-
-        verify(mockDispatcher1, times(1)).close();
-        verify(mockDispatcher2, times(1)).close();
-    }
-}
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
index f0c855c36f..4fc57137e9 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
@@ -31,17 +31,17 @@ public class TaskExecutorMDCUtils {
         return logWithMDC(taskExecutor.getId(), 
taskExecutor.getTaskExecutionContext().getLogPath());
     }
 
-    public static MDCAutoClosable logWithMDC(final int taskExecutorId) {
-        return logWithMDC(taskExecutorId, null);
+    public static MDCAutoClosable logWithMDC(final int taskInstanceId) {
+        return logWithMDC(taskInstanceId, null);
     }
 
-    public static MDCAutoClosable logWithMDC(final int taskExecutorId, final 
String logPath) {
+    public static MDCAutoClosable logWithMDC(final int taskInstanceId, final 
String logPath) {
 
         if (logPath != null) {
             MDC.put(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY, logPath);
         }
 
-        MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, 
String.valueOf(taskExecutorId));
+        MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, 
String.valueOf(taskInstanceId));
 
         return () -> {
             MDC.remove(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY);

Reply via email to