This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f5469d7741 [DSIP-55][Master] Separate the waiting dispatched task into 
different queue by worker group (#17037)
f5469d7741 is described below

commit f5469d7741d3333c7c22985f7805c3bca685d2a7
Author: [email protected] <[email protected]>
AuthorDate: Thu Apr 17 09:18:13 2025 +0800

    [DSIP-55][Master] Separate the waiting dispatched task into different queue 
by worker group (#17037)
---
 .../server/master/MasterServer.java                |   9 +-
 .../runner/GlobalTaskDispatchWaitingQueue.java     |  27 ++--
 .../GlobalTaskDispatchWaitingQueueLooper.java      |  33 +++--
 .../master/runner/WorkerGroupTaskDispatcher.java   | 125 +++++++++++++++++++
 .../runner/WorkerGroupTaskDispatcherManager.java   |  83 +++++++++++++
 ...ue.java => PriorityAndDelayBasedTaskEntry.java} |  31 +++--
 .../master/runner/queue/PriorityDelayQueue.java    |   3 +
 ...eBasedTaskExecutionRunnableComparableEntry.java |  77 ++++++++++++
 .../GlobalTaskDispatchWaitingQueueLooperTest.java  |  83 +++----------
 .../runner/GlobalTaskDispatchWaitingQueueTest.java |  99 +++------------
 .../WorkerGroupTaskDispatcherManagerTest.java      |  92 ++++++++++++++
 .../runner/WorkerGroupTaskDispatcherTest.java      | 136 +++++++++++++++++++++
 .../runner/queue/PriorityDelayQueueTest.java       |  75 ++++++++++++
 ...edTaskExecutionRunnableComparableEntryTest.java |  91 ++++++++++++++
 14 files changed, 778 insertions(+), 186 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 3afd4d0990..1f5bd646f6 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -41,6 +41,7 @@ import 
org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMaste
 import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
 import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
+import 
org.apache.dolphinscheduler.server.master.runner.WorkerGroupTaskDispatcherManager;
 import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
 import org.apache.dolphinscheduler.service.ServiceConfiguration;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -99,6 +100,9 @@ public class MasterServer implements IStoppable {
     @Autowired
     private MasterCoordinator masterCoordinator;
 
+    @Autowired
+    private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
+
     public static void main(String[] args) {
         
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
 
@@ -129,6 +133,7 @@ public class MasterServer implements IStoppable {
         this.masterCoordinator.start();
 
         this.clusterManager.start();
+
         this.clusterStateMonitors.start();
 
         this.workflowEngine.start();
@@ -183,7 +188,9 @@ public class MasterServer implements IStoppable {
                 MasterRegistryClient closedMasterRegistryClient = 
masterRegistryClient;
                 // close spring Context and will invoke method with 
@PreDestroy annotation to destroy beans.
                 // like 
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
-                SpringApplicationContext closedSpringContext = 
springApplicationContext) {
+                SpringApplicationContext closedSpringContext = 
springApplicationContext;
+                WorkerGroupTaskDispatcherManager 
closeWorkerGroupTaskDispatcherManager =
+                        workerGroupTaskDispatcherManager) {
 
             log.info("MasterServer is stopping, current cause : {}", cause);
         } catch (Exception e) {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
index b657437f9f..2f07be37b4 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
@@ -18,11 +18,12 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import org.apache.dolphinscheduler.server.master.runner.queue.DelayEntry;
-import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
+import 
org.apache.dolphinscheduler.server.master.runner.queue.TimeBasedTaskExecutionRunnableComparableEntry;
 
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -31,25 +32,19 @@ import org.springframework.stereotype.Component;
 
 /**
  * The class is used to store {@link ITaskExecutionRunnable} which needs to be 
dispatched. The {@link ITaskExecutionRunnable}
- * will be stored in {@link PriorityDelayQueue}, if the {@link 
ITaskExecutionRunnable}'s delay time is 0, then it will be
+ * will be stored in {@link DelayQueue}, if the {@link 
ITaskExecutionRunnable}'s delay time is 0, then it will be
  * consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
  * <p>
- * The order of {@link ITaskExecutionRunnable} in the {@link 
PriorityDelayQueue} is determined by {@link ITaskExecutionRunnable#compareTo}.
+ * The order of {@link ITaskExecutionRunnable} in the {@link DelayQueue} is 
determined by {@link ITaskExecutionRunnable#compareTo}.
  */
 @Slf4j
 @Component
 public class GlobalTaskDispatchWaitingQueue {
 
     private final Set<Integer> waitingTaskInstanceIds = 
ConcurrentHashMap.newKeySet();
-    private final PriorityDelayQueue<DelayEntry<ITaskExecutionRunnable>> 
priorityDelayQueue =
-            new PriorityDelayQueue<>();
 
-    /**
-     * Submit a {@link ITaskExecutionRunnable} with delay time 0, it will be 
consumed immediately.
-     */
-    public synchronized void 
dispatchTaskExecuteRunnable(ITaskExecutionRunnable ITaskExecutionRunnable) {
-        dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable, 0);
-    }
+    private final DelayQueue<TimeBasedTaskExecutionRunnableComparableEntry> 
delayQueue =
+            new DelayQueue<>();
 
     /**
      * Submit a {@link ITaskExecutionRunnable} with delay time, if the delay 
time <= 0 then it can be consumed.
@@ -57,17 +52,17 @@ public class GlobalTaskDispatchWaitingQueue {
     public synchronized void 
dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable 
taskExecutionRunnable,
                                                                   long 
delayTimeMills) {
         
waitingTaskInstanceIds.add(taskExecutionRunnable.getTaskInstance().getId());
-        priorityDelayQueue.add(new DelayEntry<>(delayTimeMills, 
taskExecutionRunnable));
+        delayQueue.add(new 
TimeBasedTaskExecutionRunnableComparableEntry(delayTimeMills, 
taskExecutionRunnable));
     }
 
     /**
-     * Consume {@link ITaskExecutionRunnable} from the {@link 
PriorityDelayQueue}, only the delay time <= 0 can be consumed.
+     * Consume {@link ITaskExecutionRunnable} from the {@link 
PriorityBlockingQueue}, only the delay time <= 0 can be consumed.
      */
     @SneakyThrows
     public ITaskExecutionRunnable takeTaskExecuteRunnable() {
-        ITaskExecutionRunnable taskExecutionRunnable = 
priorityDelayQueue.take().getData();
+        ITaskExecutionRunnable taskExecutionRunnable = 
(ITaskExecutionRunnable) delayQueue.take().getData();
         while (!markTaskExecutionRunnableRemoved(taskExecutionRunnable)) {
-            taskExecutionRunnable = priorityDelayQueue.take().getData();
+            taskExecutionRunnable = (ITaskExecutionRunnable) 
delayQueue.take().getData();
         }
         return taskExecutionRunnable;
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
index 23883da8b6..33a9b51b8d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.runner;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,7 +37,7 @@ public class GlobalTaskDispatchWaitingQueueLooper extends 
BaseDaemonThread imple
     private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
 
     @Autowired
-    private ITaskExecutorClient taskExecutorClient;
+    private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
 
     private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
 
@@ -73,23 +72,35 @@ public class GlobalTaskDispatchWaitingQueueLooper extends 
BaseDaemonThread imple
                 log.warn("The TaskInstance {} state is : {}, will not 
dispatch", taskInstance.getName(), status);
                 return;
             }
-            taskExecutorClient.dispatch(taskExecutionRunnable);
+            this.dispatchTaskToWorkerGroup(taskExecutionRunnable);
         } catch (Exception e) {
-            // If dispatch failed, will put the task back to the queue
-            // The task will be dispatched after waiting time.
-            // the waiting time will increase multiple of times, but will not 
exceed 60 seconds
-            long waitingTimeMills = Math.min(
-                    
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 
1_000L, 60_000L);
-            
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
-                    waitingTimeMills);
-            log.error("Dispatch Task: {} failed will retry after: {}/ms", 
taskInstance.getName(), waitingTimeMills, e);
+            this.delayRetryDispatch(taskExecutionRunnable, e);
         }
     }
 
+    private void delayRetryDispatch(ITaskExecutionRunnable 
taskExecutionRunnable, Exception e) {
+        // If dispatch failed, will put the task back to the queue
+        // The task will be dispatched after waiting time.
+        // the waiting time will increase multiple of times, but will not 
exceed 60 seconds
+        long waitingTimeMills = Math.min(
+                
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 
1_000L, 60_000L);
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
+                waitingTimeMills);
+        log.error("Dispatch Task: {} failed will retry after: {}/ms", 
taskExecutionRunnable.getTaskInstance().getName(),
+                waitingTimeMills, e);
+    }
+
+    private void dispatchTaskToWorkerGroup(ITaskExecutionRunnable 
taskExecutionRunnable) {
+        workerGroupTaskDispatcherManager.addTaskToWorkerGroup(
+                taskExecutionRunnable.getTaskInstance().getWorkerGroup(),
+                taskExecutionRunnable, 0);
+    }
+
     @Override
     public void close() throws Exception {
         if (RUNNING_FLAG.compareAndSet(true, false)) {
             log.info("GlobalTaskDispatchWaitingQueueLooper stopping...");
+            workerGroupTaskDispatcherManager.close();
             log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");
         } else {
             log.error("GlobalTaskDispatchWaitingQueueLooper is not started");
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
new file mode 100644
index 0000000000..10991f068a
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
+import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * WorkerGroupTaskDispatcher is responsible for dispatching tasks from the 
task queue.
+ * The main responsibilities include:
+ * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for 
dispatch.
+ * 2. Re-queuing tasks that fail to dispatch according to retry logic.
+ * 3. Ensuring thread safety and correct state transitions during task 
processing.
+ */
+@Slf4j
+public class WorkerGroupTaskDispatcher extends BaseDaemonThread {
+
+    private final ITaskExecutorClient taskExecutorClient;
+
+    // TODO The current queue is flawed. When a high-priority task fails,
+    // it will be delayed and will not return to the first or second position.
+    // Tasks with the same priority will preempt its position.
+    // If it needs to be placed at the front of the queue, the queue needs to 
be re-implemented.
+    private final 
PriorityDelayQueue<PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable>> 
workerGroupQueue;
+
+    private final AtomicBoolean runningFlag = new AtomicBoolean(false);
+
+    public WorkerGroupTaskDispatcher(String workerGroupName, 
ITaskExecutorClient taskExecutorClient) {
+        super("WorkerGroupTaskDispatcher-" + workerGroupName);
+        this.taskExecutorClient = taskExecutorClient;
+        this.workerGroupQueue = new PriorityDelayQueue<>();
+    }
+
+    /**
+     * Adds a task to the worker group queue.
+     * This method wraps the given task execution object into a priority and 
delay-based task entry and adds it to the worker group queue.
+     * The task is only added if the current dispatcher status is either 
STARTED or INIT. If the dispatcher is in any other state,
+     * the task addition will fail, and a warning message will be logged.
+     *
+     * @param taskExecutionRunnable The task execution object to add to the 
queue, which implements the {@link ITaskExecutionRunnable} interface.
+     * @param delayTimeMills The delay time in milliseconds before the task 
should be executed.
+     */
+    public void addTaskToWorkerGroupQueue(ITaskExecutionRunnable 
taskExecutionRunnable,
+                                          long delayTimeMills) {
+        workerGroupQueue.add(new 
PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
+    }
+
+    @Override
+    public synchronized void start() {
+        if (runningFlag.compareAndSet(false, true)) {
+            log.info("The {} starting...", this.getName());
+            super.start();
+            log.info("The {}  started", this.getName());
+        } else {
+            log.error("The {} status is {}, will not start again", 
this.getName(), runningFlag.get());
+        }
+    }
+
+    public synchronized void close() {
+        log.info("The {} closed called but not implemented", this.getName());
+        // todo WorkerGroupTaskDispatcher thread needs to be shut down after 
the WorkerGroup is deleted.
+    }
+
+    @Override
+    public void run() {
+        while (runningFlag.get()) {
+            dispatch();
+        }
+    }
+
+    private void dispatch() {
+        PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry = 
workerGroupQueue.take();
+        ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
+        final TaskInstance taskInstance = 
taskExecutionRunnable.getTaskInstance();
+        try {
+            final TaskExecutionStatus taskStatus = taskInstance.getState();
+            if (taskStatus != TaskExecutionStatus.SUBMITTED_SUCCESS
+                    && taskStatus != TaskExecutionStatus.DELAY_EXECUTION) {
+                log.warn("The TaskInstance {} state is : {}, will not 
dispatch", taskInstance.getName(), taskStatus);
+                return;
+            }
+            taskExecutorClient.dispatch(taskExecutionRunnable);
+        } catch (Exception e) {
+            // If dispatch failed, will put the task back to the queue
+            // The task will be dispatched after waiting time.
+            // the waiting time will increase multiple of times, but will not 
exceed 60 seconds
+            long waitingTimeMills = Math.min(
+                    
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 
1_000L, 60_000L);
+            workerGroupQueue.add(new 
PriorityAndDelayBasedTaskEntry<>(waitingTimeMills, taskExecutionRunnable));
+            log.error("Dispatch Task: {} failed will retry after: {}/ms", 
taskInstance.getName(), waitingTimeMills, e);
+        }
+    }
+
+    /**
+     * ony use unit test
+     * @return size
+     */
+    protected int queueSize() {
+        return this.workerGroupQueue.size();
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
new file mode 100644
index 0000000000..456f2cd94c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * WorkerGroupTaskDispatcherManager is responsible for managing the task 
dispatching for worker groups.
+ * It maintains a mapping of worker groups to their task dispatchers and 
priority delay queues,
+ * and supports adding tasks, starting and stopping worker groups, as well as 
cleaning up resources upon shutdown.
+ */
+@Component
+@Slf4j
+public class WorkerGroupTaskDispatcherManager implements AutoCloseable {
+
+    @Autowired
+    private ITaskExecutorClient taskExecutorClient;
+
+    @Getter
+    private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher> 
dispatchWorkerMap;
+
+    public WorkerGroupTaskDispatcherManager() {
+        dispatchWorkerMap = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Adds a task to the specified worker group queue and starts or wakes up 
the corresponding processing loop.
+     *
+     * @param workerGroup the identifier for the worker group, used to 
distinguish different task queues
+     * @param taskExecutionRunnable an instance of ITaskExecutionRunnable 
representing the task to be executed
+     * @param delayTimeMills the delay time before the task is executed, in 
milliseconds
+     */
+    public synchronized void addTaskToWorkerGroup(String workerGroup, 
ITaskExecutionRunnable taskExecutionRunnable,
+                                                  long delayTimeMills) {
+        WorkerGroupTaskDispatcher workerGroupTaskDispatcher = 
dispatchWorkerMap.computeIfAbsent(
+                workerGroup, key -> new WorkerGroupTaskDispatcher(workerGroup, 
taskExecutorClient));
+        if (!workerGroupTaskDispatcher.isAlive()) {
+            workerGroupTaskDispatcher.start();
+        }
+        
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 
delayTimeMills);
+    }
+
+    /**
+     * Stop all workerGroupTaskDispatchWaitingQueueLooper
+     */
+    @Override
+    public void close() throws Exception {
+        log.info("WorkerGroupTaskDispatcherManager start close");
+        for (Map.Entry<String, WorkerGroupTaskDispatcher> entry : 
dispatchWorkerMap.entrySet()) {
+            try {
+                entry.getValue().close();
+            } catch (Exception e) {
+                log.error("close worker group error", e);
+            }
+        }
+        log.info("WorkerGroupTaskDispatcherManager closed");
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
similarity index 56%
copy from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
copy to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
index 8ed4869625..bf762afb3e 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
@@ -17,25 +17,34 @@
 
 package org.apache.dolphinscheduler.server.master.runner.queue;
 
-import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
 
-import lombok.SneakyThrows;
+import org.jetbrains.annotations.NotNull;
 
-public class PriorityDelayQueue<V extends DelayEntry> {
+public class PriorityAndDelayBasedTaskEntry<V extends Comparable<V>> extends 
DelayEntry<V> {
 
-    private final DelayQueue<V> queue = new DelayQueue<>();
+    public PriorityAndDelayBasedTaskEntry(long delayTimeMills, V data) {
+        super(delayTimeMills, data);
+    }
 
-    public void add(V v) {
-        queue.put(v);
+    @Override
+    public long getDelay(@NotNull TimeUnit unit) {
+        return super.getDelay(unit);
     }
 
-    @SneakyThrows
-    public V take() {
-        return queue.take();
+    @Override
+    public int compareTo(@NotNull Delayed o) {
+        return super.compareTo(o);
     }
 
-    public int size() {
-        return queue.size();
+    @Override
+    public boolean equals(Object o) {
+        return super.equals(o);
     }
 
+    @Override
+    public int hashCode() {
+        return super.hashCode();
+    }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
index 8ed4869625..cf67281323 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
@@ -38,4 +38,7 @@ public class PriorityDelayQueue<V extends DelayEntry> {
         return queue.size();
     }
 
+    public void clear() {
+        queue.clear();
+    }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
new file mode 100644
index 0000000000..9bb47667e7
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.queue;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Getter;
+
+import org.jetbrains.annotations.NotNull;
+
+public class 
TimeBasedTaskExecutionRunnableComparableEntry<ITaskExecutionRunnable> 
implements Delayed {
+
+    private final long triggerTimeMills;
+    private final long delayTimeMills;
+
+    @Getter
+    private final ITaskExecutionRunnable data;
+    public TimeBasedTaskExecutionRunnableComparableEntry(long delayTimeMills, 
ITaskExecutionRunnable data) {
+        this.delayTimeMills = delayTimeMills;
+        this.triggerTimeMills = System.currentTimeMillis() + delayTimeMills;
+        this.data = checkNotNull(data, "data is null");
+    }
+
+    @Override
+    public long getDelay(@NotNull TimeUnit unit) {
+        long remainTimeMills = triggerTimeMills - System.currentTimeMillis();
+        if (TimeUnit.MILLISECONDS.equals(unit)) {
+            return remainTimeMills;
+        }
+        return unit.convert(remainTimeMills, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(@NotNull Delayed delayed) {
+        if (this == delayed) {
+            return 0;
+        }
+        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), 
delayed.getDelay(TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TimeBasedTaskExecutionRunnableComparableEntry<?> that = 
(TimeBasedTaskExecutionRunnableComparableEntry<?>) o;
+        return this.getDelay(TimeUnit.MILLISECONDS) == 
that.getDelay(TimeUnit.MILLISECONDS)
+                && Objects.equals(data, that.getData());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(data);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
index f36869cb03..0cc5571f01 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
@@ -17,31 +17,17 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import static java.time.Duration.ofSeconds;
-import static org.awaitility.Awaitility.await;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
-import 
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
-import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
-import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
-
-import java.util.HashMap;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -50,7 +36,6 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
-import org.springframework.context.ApplicationContext;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
@@ -63,61 +48,25 @@ class GlobalTaskDispatchWaitingQueueLooperTest {
     private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
 
     @Mock
-    private ITaskExecutorClient taskExecutorClient;
-
-    @Test
-    void testTaskExecutionRunnableStatusIsNotSubmitted() throws Exception {
-        WorkflowInstance workflowInstance = new WorkflowInstance();
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setState(TaskExecutionStatus.KILL);
-        taskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
-        final ITaskExecutionRunnable defaultTaskExecuteRunnable =
-                createTaskExecuteRunnable(taskInstance, workflowInstance);
+    private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
 
-        doNothing().when(taskExecutorClient).dispatch(any());
-
-        
when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(defaultTaskExecuteRunnable);
-        globalTaskDispatchWaitingQueueLooper.doDispatch();
-        await().during(ofSeconds(1))
-                .untilAsserted(() -> verify(taskExecutorClient, 
never()).dispatch(any()));
-        globalTaskDispatchWaitingQueueLooper.close();
-    }
+    @Mock
+    private ITaskExecutionRunnable taskExecutionRunnable;
 
     @Test
-    void testTaskExecutionRunnableStatusIsSubmitted() throws Exception {
-        WorkflowInstance workflowInstance = new WorkflowInstance();
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
-        taskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
-        final ITaskExecutionRunnable defaultTaskExecuteRunnable =
-                createTaskExecuteRunnable(taskInstance, workflowInstance);
-
-        doNothing().when(taskExecutorClient).dispatch(any());
+    void testTaskExecutionRunnableStatusIsSubmittedNoWorkerGroup() {
 
-        
when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(defaultTaskExecuteRunnable);
+        
when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(taskExecutionRunnable);
+        TaskInstance taskInstance = mock(TaskInstance.class);
+        
when(taskInstance.getState()).thenReturn(TaskExecutionStatus.SUBMITTED_SUCCESS);
+        when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+        
when(taskExecutionRunnable.getTaskExecutionContext()).thenReturn(mock(TaskExecutionContext.class));
         globalTaskDispatchWaitingQueueLooper.doDispatch();
-        await().atMost(ofSeconds(1)).untilAsserted(() -> {
-            verify(taskExecutorClient, 
atLeastOnce()).dispatch(any(ITaskExecutionRunnable.class));
-        });
 
-    }
+        verify(workerGroupTaskDispatcherManager, 
times(1)).addTaskToWorkerGroup(any(),
+                any(ITaskExecutionRunnable.class),
+                anyLong());
 
-    private ITaskExecutionRunnable createTaskExecuteRunnable(final 
TaskInstance taskInstance,
-                                                             final 
WorkflowInstance workflowInstance) {
-
-        final ApplicationContext applicationContext = 
mock(ApplicationContext.class);
-        when(applicationContext.getBean(TaskExecutionContextFactory.class))
-                .thenReturn(mock(TaskExecutionContextFactory.class));
-        final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder = 
TaskExecutionRunnableBuilder.builder()
-                .applicationContext(applicationContext)
-                .workflowInstance(workflowInstance)
-                .taskInstance(taskInstance)
-                .workflowExecutionGraph(new WorkflowExecutionGraph())
-                .workflowDefinition(new WorkflowDefinition())
-                .project(new Project())
-                .taskDefinition(new TaskDefinition())
-                .workflowEventBus(new WorkflowEventBus())
-                .build();
-        return new TaskExecutionRunnable(taskExecutionRunnableBuilder);
     }
+
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
index 8c4ddf616c..209c1b4376 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
@@ -36,7 +36,6 @@ import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecut
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
 
 import org.apache.commons.lang3.RandomUtils;
-import org.apache.commons.lang3.time.DateUtils;
 
 import java.time.Duration;
 import java.util.Date;
@@ -45,23 +44,22 @@ import java.util.concurrent.CompletableFuture;
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionTimeoutException;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.junit.jupiter.MockitoExtension;
 import org.springframework.context.ApplicationContext;
 
+@ExtendWith(MockitoExtension.class)
 class GlobalTaskDispatchWaitingQueueTest {
 
+    @InjectMocks
     private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
 
-    @BeforeEach
-    public void setUp() {
-        globalTaskDispatchWaitingQueue = new GlobalTaskDispatchWaitingQueue();
-    }
-
     @Test
     void submitTaskExecuteRunnable() {
-        ITaskExecutionRunnable ITaskExecutionRunnable = 
createTaskExecuteRunnable();
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable);
+        ITaskExecutionRunnable iTaskExecutionRunnable = 
createTaskExecuteRunnable();
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable,
 500);
         Awaitility.await()
                 .atMost(Duration.ofSeconds(1))
                 .untilAsserted(
@@ -71,14 +69,10 @@ class GlobalTaskDispatchWaitingQueueTest {
     @Test
     void testSubmitTaskExecuteRunnableWithDelay() {
         
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(createTaskExecuteRunnable(),
 3_000L);
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(createTaskExecuteRunnable());
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(createTaskExecuteRunnable(),
 500);
 
         
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull();
-        Awaitility.await()
-                .atLeast(Duration.ofSeconds(2))
-                .atMost(Duration.ofSeconds(4))
-                .untilAsserted(
-                        () -> 
Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()));
+        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull();
     }
 
     @Test
@@ -93,87 +87,32 @@ class GlobalTaskDispatchWaitingQueueTest {
     }
 
     @Test
-    void takeTaskExecuteRunnable_withDifferentTaskInstancePriority() {
+    void takeTaskExecuteRunnable_withDifferentTaskInstanceDelay() {
         ITaskExecutionRunnable taskExecutionRunnable1 = 
createTaskExecuteRunnable();
         taskExecutionRunnable1.getTaskInstance().setId(1);
-        
taskExecutionRunnable1.getTaskInstance().setTaskInstancePriority(Priority.MEDIUM);
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecutionRunnable1);
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable1,
 0);
 
-        ITaskExecutionRunnable ITaskExecutionRunnable2 = 
createTaskExecuteRunnable();
-        ITaskExecutionRunnable2.getTaskInstance().setId(2);
-        
ITaskExecutionRunnable2.getTaskInstance().setTaskInstancePriority(Priority.HIGH);
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable2);
+        ITaskExecutionRunnable iTaskExecutionRunnable2 = 
createTaskExecuteRunnable();
+        iTaskExecutionRunnable2.getTaskInstance().setId(2);
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable2,
 1);
 
-        ITaskExecutionRunnable ITaskExecutionRunnable3 = 
createTaskExecuteRunnable();
-        ITaskExecutionRunnable3.getTaskInstance().setId(3);
-        
ITaskExecutionRunnable3.getTaskInstance().setTaskInstancePriority(Priority.LOW);
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable3);
+        ITaskExecutionRunnable iTaskExecutionRunnable3 = 
createTaskExecuteRunnable();
+        iTaskExecutionRunnable3.getTaskInstance().setId(3);
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable3,
 2);
 
-        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
-                .isEqualTo(2);
-        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
-                .isEqualTo(1);
-        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
-                .isEqualTo(3);
-    }
-
-    @Test
-    void takeTaskExecuteRunnable_withDifferentTaskGroupPriority() {
-        ITaskExecutionRunnable ITaskExecutionRunnable1 = 
createTaskExecuteRunnable();
-        ITaskExecutionRunnable1.getTaskInstance().setId(1);
-        
ITaskExecutionRunnable1.getTaskInstance().setTaskGroupPriority(Priority.MEDIUM.getCode());
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable1);
-
-        ITaskExecutionRunnable ITaskExecutionRunnable2 = 
createTaskExecuteRunnable();
-        ITaskExecutionRunnable2.getTaskInstance().setId(2);
-        
ITaskExecutionRunnable2.getTaskInstance().setTaskGroupPriority(Priority.HIGH.getCode());
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable2);
-
-        ITaskExecutionRunnable ITaskExecutionRunnable3 = 
createTaskExecuteRunnable();
-        ITaskExecutionRunnable3.getTaskInstance().setId(3);
-        
ITaskExecutionRunnable3.getTaskInstance().setTaskGroupPriority(Priority.LOW.getCode());
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable3);
-
-        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
-                .isEqualTo(3);
         
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
                 .isEqualTo(1);
         
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
                 .isEqualTo(2);
-    }
-
-    @Test
-    void takeTaskExecuteRunnable_withDifferentSubmitTime() {
-        Date now = new Date();
-
-        ITaskExecutionRunnable ITaskExecutionRunnable1 = 
createTaskExecuteRunnable();
-        ITaskExecutionRunnable1.getTaskInstance().setId(1);
-        ITaskExecutionRunnable1.getTaskInstance().setFirstSubmitTime(now);
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable1);
-
-        ITaskExecutionRunnable ITaskExecutionRunnable2 = 
createTaskExecuteRunnable();
-        ITaskExecutionRunnable2.getTaskInstance().setId(2);
-        
ITaskExecutionRunnable2.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now,
 1));
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable2);
-
-        ITaskExecutionRunnable ITaskExecutionRunnable3 = 
createTaskExecuteRunnable();
-        ITaskExecutionRunnable3.getTaskInstance().setId(3);
-        
ITaskExecutionRunnable3.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now,
 -1));
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable3);
-
         
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
                 .isEqualTo(3);
-        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
-                .isEqualTo(1);
-        
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
-                .isEqualTo(2);
     }
 
     @Test
     void getWaitingDispatchTaskNumber() {
         Assertions.assertEquals(0, 
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
-        ITaskExecutionRunnable ITaskExecutionRunnable = 
createTaskExecuteRunnable();
-        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable);
+        ITaskExecutionRunnable iTaskExecutionRunnable = 
createTaskExecuteRunnable();
+        
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable,
 500);
         Assertions.assertEquals(1, 
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
     }
 
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
new file mode 100644
index 0000000000..6bfe96e4b5
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.test.util.ReflectionTestUtils;
+
+@ExtendWith(MockitoExtension.class)
+class WorkerGroupTaskDispatcherManagerTest {
+
+    @InjectMocks
+    private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
+
+    @Mock
+    private ITaskExecutionRunnable taskExecutionRunnable;
+
+    @Test
+    void addTaskToWorkerGroup_NewWorkerGroup_ShouldAddTask() {
+        String workerGroup = "newGroup";
+        long delayTimeMills = 1000;
+
+        workerGroupTaskDispatcherManager.addTaskToWorkerGroup(workerGroup, 
taskExecutionRunnable, delayTimeMills);
+
+        ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap 
=
+                workerGroupTaskDispatcherManager.getDispatchWorkerMap();
+
+        assertTrue(dispatchWorkerMap.containsKey(workerGroup));
+    }
+
+    @Test
+    void addTaskToWorkerGroup_ExistingWorkerGroup_ShouldAddTask() {
+        String workerGroup = "existingGroup";
+        long delayTimeMills = 1000;
+
+        WorkerGroupTaskDispatcher mockDispatcher = 
mock(WorkerGroupTaskDispatcher.class);
+
+        ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap 
= new ConcurrentHashMap<>();
+        dispatchWorkerMap.put(workerGroup, mockDispatcher);
+
+        ReflectionTestUtils.setField(workerGroupTaskDispatcherManager, 
"dispatchWorkerMap", dispatchWorkerMap);
+        doNothing().when(mockDispatcher).start();
+        workerGroupTaskDispatcherManager.addTaskToWorkerGroup(workerGroup, 
taskExecutionRunnable, delayTimeMills);
+
+        verify(mockDispatcher, 
times(1)).addTaskToWorkerGroupQueue(taskExecutionRunnable, delayTimeMills);
+    }
+
+    @Test
+    void close_ShouldCloseAllWorkerGroups() throws Exception {
+        WorkerGroupTaskDispatcher mockDispatcher1 = 
mock(WorkerGroupTaskDispatcher.class);
+        WorkerGroupTaskDispatcher mockDispatcher2 = 
mock(WorkerGroupTaskDispatcher.class);
+
+        ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap 
= new ConcurrentHashMap<>();
+        dispatchWorkerMap.put("group1", mockDispatcher1);
+        dispatchWorkerMap.put("group2", mockDispatcher2);
+
+        ReflectionTestUtils.setField(workerGroupTaskDispatcherManager, 
"dispatchWorkerMap", dispatchWorkerMap);
+
+        workerGroupTaskDispatcherManager.close();
+
+        verify(mockDispatcher1, times(1)).close();
+        verify(mockDispatcher2, times(1)).close();
+    }
+}
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
new file mode 100644
index 0000000000..a4be0d4213
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import static org.apache.dolphinscheduler.common.thread.ThreadUtils.sleep;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
+
+import java.time.Duration;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class WorkerGroupTaskDispatcherTest {
+
+    private WorkerGroupTaskDispatcher dispatcher;
+    private ITaskExecutorClient taskExecutorClient;
+
+    @BeforeEach
+    void setUp() {
+        taskExecutorClient = mock(ITaskExecutorClient.class);
+        dispatcher = new WorkerGroupTaskDispatcher("TestGroup", 
taskExecutorClient);
+    }
+
+    @Test
+    void addTaskToWorkerGroupQueue_StatusAllowed_TaskAdded() {
+        // Arrange
+        ITaskExecutionRunnable taskExecutionRunnable = 
mock(ITaskExecutionRunnable.class);
+        TaskInstance taskInstance = mock(TaskInstance.class);
+        when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+        
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
+        dispatcher.start();
+
+        // Act
+        dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
+
+        // Assert
+        assertFalse(dispatcher.queueSize() == 0);
+    }
+
+    @Test
+    void addTaskToWorkerGroupQueue_StatusNotAllowed_TaskNotAdded() {
+        // Arrange
+        ITaskExecutionRunnable taskExecutionRunnable = 
mock(ITaskExecutionRunnable.class);
+        TaskInstance taskInstance = mock(TaskInstance.class);
+        when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+        
doReturn(TaskExecutionStatus.RUNNING_EXECUTION).when(taskInstance).getState();
+
+        // Act
+        dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
+
+        // Assert
+        assertTrue(dispatcher.queueSize() > 0);
+    }
+
+    @Test
+    void start_DispatcherStartsSuccessfully() {
+        // Act
+        dispatcher.start();
+
+        // Assert
+        assertTrue(dispatcher.isAlive());
+    }
+
+    @Test
+    void dispatch_TaskDispatchedSuccessfully() throws TaskDispatchException {
+        // Arrange
+        ITaskExecutionRunnable taskExecutionRunnable = 
mock(ITaskExecutionRunnable.class);
+        TaskInstance taskInstance = mock(TaskInstance.class);
+        when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+        
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
+        doNothing().when(taskExecutorClient).dispatch(any());
+        dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
+
+        // Act
+        dispatcher.start();
+        sleep(100); // Give some time for the dispatcher to run
+        dispatcher.close();
+        Awaitility.await()
+                .atMost(Duration.ofSeconds(1)).untilAsserted(
+                        () -> verify(taskExecutorClient, 
atLeastOnce()).dispatch(taskExecutionRunnable));
+        // Assert
+
+    }
+
+    @Test
+    void dispatch_TaskDispatchFails_RetryLogicWorks() throws 
TaskDispatchException {
+        // Arrange
+        ITaskExecutionRunnable taskExecutionRunnable = 
mock(ITaskExecutionRunnable.class);
+        TaskInstance taskInstance = mock(TaskInstance.class);
+        when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+        
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
+        doThrow(new 
RuntimeException()).when(taskExecutorClient).dispatch(any());
+        dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
+
+        // Act
+        dispatcher.start();
+        sleep(100); // Give some time for the dispatcher to run
+        dispatcher.close();
+
+        // Assert
+        Awaitility.await()
+                .atMost(Duration.ofSeconds(1)).untilAsserted(
+                        () -> verify(taskExecutorClient, 
atLeastOnce()).dispatch(any()));
+    }
+}
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
new file mode 100644
index 0000000000..32aeda7f64
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.queue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class PriorityDelayQueueTest {
+
+    private PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> queue;
+    private ITaskExecutionRunnable taskExecutionRunnable;
+
+    @BeforeEach
+    public void setUp() {
+        queue = new PriorityDelayQueue<>();
+        taskExecutionRunnable = mock(ITaskExecutionRunnable.class);
+    }
+
+    @Test
+    public void testAdd() {
+        queue.add(new PriorityAndDelayBasedTaskEntry(1000, 
taskExecutionRunnable));
+        assertEquals(1, queue.size());
+
+        queue.add(new PriorityAndDelayBasedTaskEntry(2000, 
taskExecutionRunnable));
+        assertEquals(2, queue.size());
+    }
+
+    @Test
+    public void testTake() throws InterruptedException {
+        queue.add(new PriorityAndDelayBasedTaskEntry(1000, 
taskExecutionRunnable));
+        PriorityAndDelayBasedTaskEntry entry = queue.take();
+        assertNotNull(entry);
+        assertEquals(0, queue.size());
+
+    }
+
+    @Test
+    public void testSize() {
+        assertEquals(0, queue.size());
+
+        queue.add(new PriorityAndDelayBasedTaskEntry(1000, 
taskExecutionRunnable));
+        assertEquals(1, queue.size());
+    }
+
+    @Test
+    public void testClear() {
+        queue.add(new PriorityAndDelayBasedTaskEntry(1000, 
taskExecutionRunnable));
+        queue.add(new PriorityAndDelayBasedTaskEntry(2000, 
taskExecutionRunnable));
+        assertEquals(2, queue.size());
+
+        queue.clear();
+        assertEquals(0, queue.size());
+    }
+}
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
new file mode 100644
index 0000000000..2d4292137b
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.queue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TimeBasedTaskExecutionRunnableComparableEntryTest {
+
+    private static final long TEST_DELAY_MILLS = 1000L;
+    private final String testData = "testData";
+    private TimeBasedTaskExecutionRunnableComparableEntry<String> entry;
+
+    @BeforeEach
+    public void setUp() {
+        entry = new 
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
+    }
+
+    @Test
+    void constructor_NullData_ThrowsNullPointerException() {
+        try {
+            new 
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, null);
+            fail("Expected NullPointerException to be thrown");
+        } catch (NullPointerException e) {
+            assertEquals("data is null", e.getMessage());
+        }
+    }
+
+    @Test
+    void getDelay_BeforeTriggerTime_ReturnsPositive() {
+        entry = new 
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
+        Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(
+                () -> assertTrue(entry.getDelay(TimeUnit.MILLISECONDS) > 0));
+    }
+
+    @Test
+    void getDelay_AtTriggerTime_ReturnsZero() {
+        entry = new 
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
+        Awaitility.await().atLeast(1000, TimeUnit.MILLISECONDS)
+                .with().pollInterval(1000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            long remainTime = 
entry.getDelay(TimeUnit.MILLISECONDS);
+                            // The allowable error is +-200
+                            System.out.println("remainTime:" + remainTime);
+                            assertTrue(Math.abs(remainTime) <= 200);
+                        });
+    }
+
+    @Test
+    void getDelay_AfterTriggerTime_ReturnsNegative() {
+        entry = new 
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
+        Awaitility.await().atMost(1500, TimeUnit.MILLISECONDS).untilAsserted(
+                () -> assertTrue(entry.getDelay(TimeUnit.MILLISECONDS) < 0));
+    }
+
+    @Test
+    void getDelay_DifferentTimeUnits_ReturnsCorrectValues() {
+        long remainTimeMillis = entry.getDelay(TimeUnit.MILLISECONDS);
+        long remainTimeSeconds = entry.getDelay(TimeUnit.SECONDS);
+
+        assertTrue(remainTimeSeconds <= remainTimeMillis / 1000);
+    }
+
+    @Test
+    void compareTo_SameObject_ReturnsZero() {
+        assertEquals(0, entry.compareTo(entry));
+    }
+}

Reply via email to