This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f5469d7741 [DSIP-55][Master] Separate the waiting dispatched task into
different queue by worker group (#17037)
f5469d7741 is described below
commit f5469d7741d3333c7c22985f7805c3bca685d2a7
Author: [email protected] <[email protected]>
AuthorDate: Thu Apr 17 09:18:13 2025 +0800
[DSIP-55][Master] Separate the waiting dispatched task into different queue
by worker group (#17037)
---
.../server/master/MasterServer.java | 9 +-
.../runner/GlobalTaskDispatchWaitingQueue.java | 27 ++--
.../GlobalTaskDispatchWaitingQueueLooper.java | 33 +++--
.../master/runner/WorkerGroupTaskDispatcher.java | 125 +++++++++++++++++++
.../runner/WorkerGroupTaskDispatcherManager.java | 83 +++++++++++++
...ue.java => PriorityAndDelayBasedTaskEntry.java} | 31 +++--
.../master/runner/queue/PriorityDelayQueue.java | 3 +
...eBasedTaskExecutionRunnableComparableEntry.java | 77 ++++++++++++
.../GlobalTaskDispatchWaitingQueueLooperTest.java | 83 +++----------
.../runner/GlobalTaskDispatchWaitingQueueTest.java | 99 +++------------
.../WorkerGroupTaskDispatcherManagerTest.java | 92 ++++++++++++++
.../runner/WorkerGroupTaskDispatcherTest.java | 136 +++++++++++++++++++++
.../runner/queue/PriorityDelayQueueTest.java | 75 ++++++++++++
...edTaskExecutionRunnableComparableEntryTest.java | 91 ++++++++++++++
14 files changed, 778 insertions(+), 186 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 3afd4d0990..1f5bd646f6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -41,6 +41,7 @@ import
org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMaste
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
+import
org.apache.dolphinscheduler.server.master.runner.WorkerGroupTaskDispatcherManager;
import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
import org.apache.dolphinscheduler.service.ServiceConfiguration;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -99,6 +100,9 @@ public class MasterServer implements IStoppable {
@Autowired
private MasterCoordinator masterCoordinator;
+ @Autowired
+ private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
+
public static void main(String[] args) {
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
@@ -129,6 +133,7 @@ public class MasterServer implements IStoppable {
this.masterCoordinator.start();
this.clusterManager.start();
+
this.clusterStateMonitors.start();
this.workflowEngine.start();
@@ -183,7 +188,9 @@ public class MasterServer implements IStoppable {
MasterRegistryClient closedMasterRegistryClient =
masterRegistryClient;
// close spring Context and will invoke method with
@PreDestroy annotation to destroy beans.
// like
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
- SpringApplicationContext closedSpringContext =
springApplicationContext) {
+ SpringApplicationContext closedSpringContext =
springApplicationContext;
+ WorkerGroupTaskDispatcherManager
closeWorkerGroupTaskDispatcherManager =
+ workerGroupTaskDispatcherManager) {
log.info("MasterServer is stopping, current cause : {}", cause);
} catch (Exception e) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
index b657437f9f..2f07be37b4 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
@@ -18,11 +18,12 @@
package org.apache.dolphinscheduler.server.master.runner;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import org.apache.dolphinscheduler.server.master.runner.queue.DelayEntry;
-import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
+import
org.apache.dolphinscheduler.server.master.runner.queue.TimeBasedTaskExecutionRunnableComparableEntry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -31,25 +32,19 @@ import org.springframework.stereotype.Component;
/**
* The class is used to store {@link ITaskExecutionRunnable} which needs to be
dispatched. The {@link ITaskExecutionRunnable}
- * will be stored in {@link PriorityDelayQueue}, if the {@link
ITaskExecutionRunnable}'s delay time is 0, then it will be
+ * will be stored in {@link DelayQueue}, if the {@link
ITaskExecutionRunnable}'s delay time is 0, then it will be
* consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
* <p>
- * The order of {@link ITaskExecutionRunnable} in the {@link
PriorityDelayQueue} is determined by {@link ITaskExecutionRunnable#compareTo}.
+ * The order of {@link ITaskExecutionRunnable} in the {@link DelayQueue} is
determined by {@link ITaskExecutionRunnable#compareTo}.
*/
@Slf4j
@Component
public class GlobalTaskDispatchWaitingQueue {
private final Set<Integer> waitingTaskInstanceIds =
ConcurrentHashMap.newKeySet();
- private final PriorityDelayQueue<DelayEntry<ITaskExecutionRunnable>>
priorityDelayQueue =
- new PriorityDelayQueue<>();
- /**
- * Submit a {@link ITaskExecutionRunnable} with delay time 0, it will be
consumed immediately.
- */
- public synchronized void
dispatchTaskExecuteRunnable(ITaskExecutionRunnable ITaskExecutionRunnable) {
- dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable, 0);
- }
+ private final DelayQueue<TimeBasedTaskExecutionRunnableComparableEntry>
delayQueue =
+ new DelayQueue<>();
/**
* Submit a {@link ITaskExecutionRunnable} with delay time, if the delay
time <= 0 then it can be consumed.
@@ -57,17 +52,17 @@ public class GlobalTaskDispatchWaitingQueue {
public synchronized void
dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable
taskExecutionRunnable,
long
delayTimeMills) {
waitingTaskInstanceIds.add(taskExecutionRunnable.getTaskInstance().getId());
- priorityDelayQueue.add(new DelayEntry<>(delayTimeMills,
taskExecutionRunnable));
+ delayQueue.add(new
TimeBasedTaskExecutionRunnableComparableEntry(delayTimeMills,
taskExecutionRunnable));
}
/**
- * Consume {@link ITaskExecutionRunnable} from the {@link
PriorityDelayQueue}, only the delay time <= 0 can be consumed.
+ * Consume {@link ITaskExecutionRunnable} from the {@link
PriorityBlockingQueue}, only the delay time <= 0 can be consumed.
*/
@SneakyThrows
public ITaskExecutionRunnable takeTaskExecuteRunnable() {
- ITaskExecutionRunnable taskExecutionRunnable =
priorityDelayQueue.take().getData();
+ ITaskExecutionRunnable taskExecutionRunnable =
(ITaskExecutionRunnable) delayQueue.take().getData();
while (!markTaskExecutionRunnableRemoved(taskExecutionRunnable)) {
- taskExecutionRunnable = priorityDelayQueue.take().getData();
+ taskExecutionRunnable = (ITaskExecutionRunnable)
delayQueue.take().getData();
}
return taskExecutionRunnable;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
index 23883da8b6..33a9b51b8d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,7 +37,7 @@ public class GlobalTaskDispatchWaitingQueueLooper extends
BaseDaemonThread imple
private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
@Autowired
- private ITaskExecutorClient taskExecutorClient;
+ private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
@@ -73,23 +72,35 @@ public class GlobalTaskDispatchWaitingQueueLooper extends
BaseDaemonThread imple
log.warn("The TaskInstance {} state is : {}, will not
dispatch", taskInstance.getName(), status);
return;
}
- taskExecutorClient.dispatch(taskExecutionRunnable);
+ this.dispatchTaskToWorkerGroup(taskExecutionRunnable);
} catch (Exception e) {
- // If dispatch failed, will put the task back to the queue
- // The task will be dispatched after waiting time.
- // the waiting time will increase multiple of times, but will not
exceed 60 seconds
- long waitingTimeMills = Math.min(
-
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() *
1_000L, 60_000L);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
- waitingTimeMills);
- log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskInstance.getName(), waitingTimeMills, e);
+ this.delayRetryDispatch(taskExecutionRunnable, e);
}
}
+ private void delayRetryDispatch(ITaskExecutionRunnable
taskExecutionRunnable, Exception e) {
+ // If dispatch failed, will put the task back to the queue
+ // The task will be dispatched after waiting time.
+ // the waiting time will increase multiple of times, but will not
exceed 60 seconds
+ long waitingTimeMills = Math.min(
+
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() *
1_000L, 60_000L);
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
+ waitingTimeMills);
+ log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskExecutionRunnable.getTaskInstance().getName(),
+ waitingTimeMills, e);
+ }
+
+ private void dispatchTaskToWorkerGroup(ITaskExecutionRunnable
taskExecutionRunnable) {
+ workerGroupTaskDispatcherManager.addTaskToWorkerGroup(
+ taskExecutionRunnable.getTaskInstance().getWorkerGroup(),
+ taskExecutionRunnable, 0);
+ }
+
@Override
public void close() throws Exception {
if (RUNNING_FLAG.compareAndSet(true, false)) {
log.info("GlobalTaskDispatchWaitingQueueLooper stopping...");
+ workerGroupTaskDispatcherManager.close();
log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");
} else {
log.error("GlobalTaskDispatchWaitingQueueLooper is not started");
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
new file mode 100644
index 0000000000..10991f068a
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
+import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * WorkerGroupTaskDispatcher is responsible for dispatching tasks from the
task queue.
+ * The main responsibilities include:
+ * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for
dispatch.
+ * 2. Re-queuing tasks that fail to dispatch according to retry logic.
+ * 3. Ensuring thread safety and correct state transitions during task
processing.
+ */
+@Slf4j
+public class WorkerGroupTaskDispatcher extends BaseDaemonThread {
+
+ private final ITaskExecutorClient taskExecutorClient;
+
+ // TODO The current queue is flawed. When a high-priority task fails,
+ // it will be delayed and will not return to the first or second position.
+ // Tasks with the same priority will preempt its position.
+ // If it needs to be placed at the front of the queue, the queue needs to
be re-implemented.
+ private final
PriorityDelayQueue<PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable>>
workerGroupQueue;
+
+ private final AtomicBoolean runningFlag = new AtomicBoolean(false);
+
+ public WorkerGroupTaskDispatcher(String workerGroupName,
ITaskExecutorClient taskExecutorClient) {
+ super("WorkerGroupTaskDispatcher-" + workerGroupName);
+ this.taskExecutorClient = taskExecutorClient;
+ this.workerGroupQueue = new PriorityDelayQueue<>();
+ }
+
+ /**
+ * Adds a task to the worker group queue.
+ * This method wraps the given task execution object into a priority and
delay-based task entry and adds it to the worker group queue.
+ * The task is only added if the current dispatcher status is either
STARTED or INIT. If the dispatcher is in any other state,
+ * the task addition will fail, and a warning message will be logged.
+ *
+ * @param taskExecutionRunnable The task execution object to add to the
queue, which implements the {@link ITaskExecutionRunnable} interface.
+ * @param delayTimeMills The delay time in milliseconds before the task
should be executed.
+ */
+ public void addTaskToWorkerGroupQueue(ITaskExecutionRunnable
taskExecutionRunnable,
+ long delayTimeMills) {
+ workerGroupQueue.add(new
PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
+ }
+
+ @Override
+ public synchronized void start() {
+ if (runningFlag.compareAndSet(false, true)) {
+ log.info("The {} starting...", this.getName());
+ super.start();
+ log.info("The {} started", this.getName());
+ } else {
+ log.error("The {} status is {}, will not start again",
this.getName(), runningFlag.get());
+ }
+ }
+
+ public synchronized void close() {
+ log.info("The {} closed called but not implemented", this.getName());
+ // todo WorkerGroupTaskDispatcher thread needs to be shut down after
the WorkerGroup is deleted.
+ }
+
+ @Override
+ public void run() {
+ while (runningFlag.get()) {
+ dispatch();
+ }
+ }
+
+ private void dispatch() {
+ PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry =
workerGroupQueue.take();
+ ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
+ final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
+ try {
+ final TaskExecutionStatus taskStatus = taskInstance.getState();
+ if (taskStatus != TaskExecutionStatus.SUBMITTED_SUCCESS
+ && taskStatus != TaskExecutionStatus.DELAY_EXECUTION) {
+ log.warn("The TaskInstance {} state is : {}, will not
dispatch", taskInstance.getName(), taskStatus);
+ return;
+ }
+ taskExecutorClient.dispatch(taskExecutionRunnable);
+ } catch (Exception e) {
+ // If dispatch failed, will put the task back to the queue
+ // The task will be dispatched after waiting time.
+ // the waiting time will increase multiple of times, but will not
exceed 60 seconds
+ long waitingTimeMills = Math.min(
+
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() *
1_000L, 60_000L);
+ workerGroupQueue.add(new
PriorityAndDelayBasedTaskEntry<>(waitingTimeMills, taskExecutionRunnable));
+ log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskInstance.getName(), waitingTimeMills, e);
+ }
+ }
+
+ /**
+ * ony use unit test
+ * @return size
+ */
+ protected int queueSize() {
+ return this.workerGroupQueue.size();
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
new file mode 100644
index 0000000000..456f2cd94c
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * WorkerGroupTaskDispatcherManager is responsible for managing the task
dispatching for worker groups.
+ * It maintains a mapping of worker groups to their task dispatchers and
priority delay queues,
+ * and supports adding tasks, starting and stopping worker groups, as well as
cleaning up resources upon shutdown.
+ */
+@Component
+@Slf4j
+public class WorkerGroupTaskDispatcherManager implements AutoCloseable {
+
+ @Autowired
+ private ITaskExecutorClient taskExecutorClient;
+
+ @Getter
+ private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher>
dispatchWorkerMap;
+
+ public WorkerGroupTaskDispatcherManager() {
+ dispatchWorkerMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Adds a task to the specified worker group queue and starts or wakes up
the corresponding processing loop.
+ *
+ * @param workerGroup the identifier for the worker group, used to
distinguish different task queues
+ * @param taskExecutionRunnable an instance of ITaskExecutionRunnable
representing the task to be executed
+ * @param delayTimeMills the delay time before the task is executed, in
milliseconds
+ */
+ public synchronized void addTaskToWorkerGroup(String workerGroup,
ITaskExecutionRunnable taskExecutionRunnable,
+ long delayTimeMills) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.computeIfAbsent(
+ workerGroup, key -> new WorkerGroupTaskDispatcher(workerGroup,
taskExecutorClient));
+ if (!workerGroupTaskDispatcher.isAlive()) {
+ workerGroupTaskDispatcher.start();
+ }
+
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable,
delayTimeMills);
+ }
+
+ /**
+ * Stop all workerGroupTaskDispatchWaitingQueueLooper
+ */
+ @Override
+ public void close() throws Exception {
+ log.info("WorkerGroupTaskDispatcherManager start close");
+ for (Map.Entry<String, WorkerGroupTaskDispatcher> entry :
dispatchWorkerMap.entrySet()) {
+ try {
+ entry.getValue().close();
+ } catch (Exception e) {
+ log.error("close worker group error", e);
+ }
+ }
+ log.info("WorkerGroupTaskDispatcherManager closed");
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
similarity index 56%
copy from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
index 8ed4869625..bf762afb3e 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
@@ -17,25 +17,34 @@
package org.apache.dolphinscheduler.server.master.runner.queue;
-import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
-import lombok.SneakyThrows;
+import org.jetbrains.annotations.NotNull;
-public class PriorityDelayQueue<V extends DelayEntry> {
+public class PriorityAndDelayBasedTaskEntry<V extends Comparable<V>> extends
DelayEntry<V> {
- private final DelayQueue<V> queue = new DelayQueue<>();
+ public PriorityAndDelayBasedTaskEntry(long delayTimeMills, V data) {
+ super(delayTimeMills, data);
+ }
- public void add(V v) {
- queue.put(v);
+ @Override
+ public long getDelay(@NotNull TimeUnit unit) {
+ return super.getDelay(unit);
}
- @SneakyThrows
- public V take() {
- return queue.take();
+ @Override
+ public int compareTo(@NotNull Delayed o) {
+ return super.compareTo(o);
}
- public int size() {
- return queue.size();
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
}
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
index 8ed4869625..cf67281323 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
@@ -38,4 +38,7 @@ public class PriorityDelayQueue<V extends DelayEntry> {
return queue.size();
}
+ public void clear() {
+ queue.clear();
+ }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
new file mode 100644
index 0000000000..9bb47667e7
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.queue;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Getter;
+
+import org.jetbrains.annotations.NotNull;
+
+public class
TimeBasedTaskExecutionRunnableComparableEntry<ITaskExecutionRunnable>
implements Delayed {
+
+ private final long triggerTimeMills;
+ private final long delayTimeMills;
+
+ @Getter
+ private final ITaskExecutionRunnable data;
+ public TimeBasedTaskExecutionRunnableComparableEntry(long delayTimeMills,
ITaskExecutionRunnable data) {
+ this.delayTimeMills = delayTimeMills;
+ this.triggerTimeMills = System.currentTimeMillis() + delayTimeMills;
+ this.data = checkNotNull(data, "data is null");
+ }
+
+ @Override
+ public long getDelay(@NotNull TimeUnit unit) {
+ long remainTimeMills = triggerTimeMills - System.currentTimeMillis();
+ if (TimeUnit.MILLISECONDS.equals(unit)) {
+ return remainTimeMills;
+ }
+ return unit.convert(remainTimeMills, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(@NotNull Delayed delayed) {
+ if (this == delayed) {
+ return 0;
+ }
+ return Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
delayed.getDelay(TimeUnit.MILLISECONDS));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TimeBasedTaskExecutionRunnableComparableEntry<?> that =
(TimeBasedTaskExecutionRunnableComparableEntry<?>) o;
+ return this.getDelay(TimeUnit.MILLISECONDS) ==
that.getDelay(TimeUnit.MILLISECONDS)
+ && Objects.equals(data, that.getData());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(data);
+ }
+}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
index f36869cb03..0cc5571f01 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
@@ -17,31 +17,17 @@
package org.apache.dolphinscheduler.server.master.runner;
-import static java.time.Duration.ofSeconds;
-import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
-import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
-import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
-
-import java.util.HashMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -50,7 +36,6 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
-import org.springframework.context.ApplicationContext;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@@ -63,61 +48,25 @@ class GlobalTaskDispatchWaitingQueueLooperTest {
private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
@Mock
- private ITaskExecutorClient taskExecutorClient;
-
- @Test
- void testTaskExecutionRunnableStatusIsNotSubmitted() throws Exception {
- WorkflowInstance workflowInstance = new WorkflowInstance();
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setState(TaskExecutionStatus.KILL);
- taskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
- final ITaskExecutionRunnable defaultTaskExecuteRunnable =
- createTaskExecuteRunnable(taskInstance, workflowInstance);
+ private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
- doNothing().when(taskExecutorClient).dispatch(any());
-
-
when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(defaultTaskExecuteRunnable);
- globalTaskDispatchWaitingQueueLooper.doDispatch();
- await().during(ofSeconds(1))
- .untilAsserted(() -> verify(taskExecutorClient,
never()).dispatch(any()));
- globalTaskDispatchWaitingQueueLooper.close();
- }
+ @Mock
+ private ITaskExecutionRunnable taskExecutionRunnable;
@Test
- void testTaskExecutionRunnableStatusIsSubmitted() throws Exception {
- WorkflowInstance workflowInstance = new WorkflowInstance();
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
- taskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
- final ITaskExecutionRunnable defaultTaskExecuteRunnable =
- createTaskExecuteRunnable(taskInstance, workflowInstance);
-
- doNothing().when(taskExecutorClient).dispatch(any());
+ void testTaskExecutionRunnableStatusIsSubmittedNoWorkerGroup() {
-
when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(defaultTaskExecuteRunnable);
+
when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(taskExecutionRunnable);
+ TaskInstance taskInstance = mock(TaskInstance.class);
+
when(taskInstance.getState()).thenReturn(TaskExecutionStatus.SUBMITTED_SUCCESS);
+ when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+
when(taskExecutionRunnable.getTaskExecutionContext()).thenReturn(mock(TaskExecutionContext.class));
globalTaskDispatchWaitingQueueLooper.doDispatch();
- await().atMost(ofSeconds(1)).untilAsserted(() -> {
- verify(taskExecutorClient,
atLeastOnce()).dispatch(any(ITaskExecutionRunnable.class));
- });
- }
+ verify(workerGroupTaskDispatcherManager,
times(1)).addTaskToWorkerGroup(any(),
+ any(ITaskExecutionRunnable.class),
+ anyLong());
- private ITaskExecutionRunnable createTaskExecuteRunnable(final
TaskInstance taskInstance,
- final
WorkflowInstance workflowInstance) {
-
- final ApplicationContext applicationContext =
mock(ApplicationContext.class);
- when(applicationContext.getBean(TaskExecutionContextFactory.class))
- .thenReturn(mock(TaskExecutionContextFactory.class));
- final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder =
TaskExecutionRunnableBuilder.builder()
- .applicationContext(applicationContext)
- .workflowInstance(workflowInstance)
- .taskInstance(taskInstance)
- .workflowExecutionGraph(new WorkflowExecutionGraph())
- .workflowDefinition(new WorkflowDefinition())
- .project(new Project())
- .taskDefinition(new TaskDefinition())
- .workflowEventBus(new WorkflowEventBus())
- .build();
- return new TaskExecutionRunnable(taskExecutionRunnableBuilder);
}
+
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
index 8c4ddf616c..209c1b4376 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
@@ -36,7 +36,6 @@ import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecut
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
import org.apache.commons.lang3.RandomUtils;
-import org.apache.commons.lang3.time.DateUtils;
import java.time.Duration;
import java.util.Date;
@@ -45,23 +44,22 @@ import java.util.concurrent.CompletableFuture;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.context.ApplicationContext;
+@ExtendWith(MockitoExtension.class)
class GlobalTaskDispatchWaitingQueueTest {
+ @InjectMocks
private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
- @BeforeEach
- public void setUp() {
- globalTaskDispatchWaitingQueue = new GlobalTaskDispatchWaitingQueue();
- }
-
@Test
void submitTaskExecuteRunnable() {
- ITaskExecutionRunnable ITaskExecutionRunnable =
createTaskExecuteRunnable();
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable);
+ ITaskExecutionRunnable iTaskExecutionRunnable =
createTaskExecuteRunnable();
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable,
500);
Awaitility.await()
.atMost(Duration.ofSeconds(1))
.untilAsserted(
@@ -71,14 +69,10 @@ class GlobalTaskDispatchWaitingQueueTest {
@Test
void testSubmitTaskExecuteRunnableWithDelay() {
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(createTaskExecuteRunnable(),
3_000L);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(createTaskExecuteRunnable());
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(createTaskExecuteRunnable(),
500);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull();
- Awaitility.await()
- .atLeast(Duration.ofSeconds(2))
- .atMost(Duration.ofSeconds(4))
- .untilAsserted(
- () ->
Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()));
+
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull();
}
@Test
@@ -93,87 +87,32 @@ class GlobalTaskDispatchWaitingQueueTest {
}
@Test
- void takeTaskExecuteRunnable_withDifferentTaskInstancePriority() {
+ void takeTaskExecuteRunnable_withDifferentTaskInstanceDelay() {
ITaskExecutionRunnable taskExecutionRunnable1 =
createTaskExecuteRunnable();
taskExecutionRunnable1.getTaskInstance().setId(1);
-
taskExecutionRunnable1.getTaskInstance().setTaskInstancePriority(Priority.MEDIUM);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecutionRunnable1);
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable1,
0);
- ITaskExecutionRunnable ITaskExecutionRunnable2 =
createTaskExecuteRunnable();
- ITaskExecutionRunnable2.getTaskInstance().setId(2);
-
ITaskExecutionRunnable2.getTaskInstance().setTaskInstancePriority(Priority.HIGH);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable2);
+ ITaskExecutionRunnable iTaskExecutionRunnable2 =
createTaskExecuteRunnable();
+ iTaskExecutionRunnable2.getTaskInstance().setId(2);
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable2,
1);
- ITaskExecutionRunnable ITaskExecutionRunnable3 =
createTaskExecuteRunnable();
- ITaskExecutionRunnable3.getTaskInstance().setId(3);
-
ITaskExecutionRunnable3.getTaskInstance().setTaskInstancePriority(Priority.LOW);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable3);
+ ITaskExecutionRunnable iTaskExecutionRunnable3 =
createTaskExecuteRunnable();
+ iTaskExecutionRunnable3.getTaskInstance().setId(3);
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable3,
2);
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
- .isEqualTo(2);
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
- .isEqualTo(1);
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
- .isEqualTo(3);
- }
-
- @Test
- void takeTaskExecuteRunnable_withDifferentTaskGroupPriority() {
- ITaskExecutionRunnable ITaskExecutionRunnable1 =
createTaskExecuteRunnable();
- ITaskExecutionRunnable1.getTaskInstance().setId(1);
-
ITaskExecutionRunnable1.getTaskInstance().setTaskGroupPriority(Priority.MEDIUM.getCode());
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable1);
-
- ITaskExecutionRunnable ITaskExecutionRunnable2 =
createTaskExecuteRunnable();
- ITaskExecutionRunnable2.getTaskInstance().setId(2);
-
ITaskExecutionRunnable2.getTaskInstance().setTaskGroupPriority(Priority.HIGH.getCode());
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable2);
-
- ITaskExecutionRunnable ITaskExecutionRunnable3 =
createTaskExecuteRunnable();
- ITaskExecutionRunnable3.getTaskInstance().setId(3);
-
ITaskExecutionRunnable3.getTaskInstance().setTaskGroupPriority(Priority.LOW.getCode());
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable3);
-
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
- .isEqualTo(3);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(1);
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(2);
- }
-
- @Test
- void takeTaskExecuteRunnable_withDifferentSubmitTime() {
- Date now = new Date();
-
- ITaskExecutionRunnable ITaskExecutionRunnable1 =
createTaskExecuteRunnable();
- ITaskExecutionRunnable1.getTaskInstance().setId(1);
- ITaskExecutionRunnable1.getTaskInstance().setFirstSubmitTime(now);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable1);
-
- ITaskExecutionRunnable ITaskExecutionRunnable2 =
createTaskExecuteRunnable();
- ITaskExecutionRunnable2.getTaskInstance().setId(2);
-
ITaskExecutionRunnable2.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now,
1));
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable2);
-
- ITaskExecutionRunnable ITaskExecutionRunnable3 =
createTaskExecuteRunnable();
- ITaskExecutionRunnable3.getTaskInstance().setId(3);
-
ITaskExecutionRunnable3.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now,
-1));
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable3);
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
.isEqualTo(3);
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
- .isEqualTo(1);
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
- .isEqualTo(2);
}
@Test
void getWaitingDispatchTaskNumber() {
Assertions.assertEquals(0,
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
- ITaskExecutionRunnable ITaskExecutionRunnable =
createTaskExecuteRunnable();
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(ITaskExecutionRunnable);
+ ITaskExecutionRunnable iTaskExecutionRunnable =
createTaskExecuteRunnable();
+
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable,
500);
Assertions.assertEquals(1,
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
new file mode 100644
index 0000000000..6bfe96e4b5
--- /dev/null
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.test.util.ReflectionTestUtils;
+
+@ExtendWith(MockitoExtension.class)
+class WorkerGroupTaskDispatcherManagerTest {
+
+ @InjectMocks
+ private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
+
+ @Mock
+ private ITaskExecutionRunnable taskExecutionRunnable;
+
+ @Test
+ void addTaskToWorkerGroup_NewWorkerGroup_ShouldAddTask() {
+ String workerGroup = "newGroup";
+ long delayTimeMills = 1000;
+
+ workerGroupTaskDispatcherManager.addTaskToWorkerGroup(workerGroup,
taskExecutionRunnable, delayTimeMills);
+
+ ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap
=
+ workerGroupTaskDispatcherManager.getDispatchWorkerMap();
+
+ assertTrue(dispatchWorkerMap.containsKey(workerGroup));
+ }
+
+ @Test
+ void addTaskToWorkerGroup_ExistingWorkerGroup_ShouldAddTask() {
+ String workerGroup = "existingGroup";
+ long delayTimeMills = 1000;
+
+ WorkerGroupTaskDispatcher mockDispatcher =
mock(WorkerGroupTaskDispatcher.class);
+
+ ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap
= new ConcurrentHashMap<>();
+ dispatchWorkerMap.put(workerGroup, mockDispatcher);
+
+ ReflectionTestUtils.setField(workerGroupTaskDispatcherManager,
"dispatchWorkerMap", dispatchWorkerMap);
+ doNothing().when(mockDispatcher).start();
+ workerGroupTaskDispatcherManager.addTaskToWorkerGroup(workerGroup,
taskExecutionRunnable, delayTimeMills);
+
+ verify(mockDispatcher,
times(1)).addTaskToWorkerGroupQueue(taskExecutionRunnable, delayTimeMills);
+ }
+
+ @Test
+ void close_ShouldCloseAllWorkerGroups() throws Exception {
+ WorkerGroupTaskDispatcher mockDispatcher1 =
mock(WorkerGroupTaskDispatcher.class);
+ WorkerGroupTaskDispatcher mockDispatcher2 =
mock(WorkerGroupTaskDispatcher.class);
+
+ ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap
= new ConcurrentHashMap<>();
+ dispatchWorkerMap.put("group1", mockDispatcher1);
+ dispatchWorkerMap.put("group2", mockDispatcher2);
+
+ ReflectionTestUtils.setField(workerGroupTaskDispatcherManager,
"dispatchWorkerMap", dispatchWorkerMap);
+
+ workerGroupTaskDispatcherManager.close();
+
+ verify(mockDispatcher1, times(1)).close();
+ verify(mockDispatcher2, times(1)).close();
+ }
+}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
new file mode 100644
index 0000000000..a4be0d4213
--- /dev/null
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import static org.apache.dolphinscheduler.common.thread.ThreadUtils.sleep;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
+
+import java.time.Duration;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class WorkerGroupTaskDispatcherTest {
+
+ private WorkerGroupTaskDispatcher dispatcher;
+ private ITaskExecutorClient taskExecutorClient;
+
+ @BeforeEach
+ void setUp() {
+ taskExecutorClient = mock(ITaskExecutorClient.class);
+ dispatcher = new WorkerGroupTaskDispatcher("TestGroup",
taskExecutorClient);
+ }
+
+ @Test
+ void addTaskToWorkerGroupQueue_StatusAllowed_TaskAdded() {
+ // Arrange
+ ITaskExecutionRunnable taskExecutionRunnable =
mock(ITaskExecutionRunnable.class);
+ TaskInstance taskInstance = mock(TaskInstance.class);
+ when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
+ dispatcher.start();
+
+ // Act
+ dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
+
+ // Assert
+ assertFalse(dispatcher.queueSize() == 0);
+ }
+
+ @Test
+ void addTaskToWorkerGroupQueue_StatusNotAllowed_TaskNotAdded() {
+ // Arrange
+ ITaskExecutionRunnable taskExecutionRunnable =
mock(ITaskExecutionRunnable.class);
+ TaskInstance taskInstance = mock(TaskInstance.class);
+ when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+
doReturn(TaskExecutionStatus.RUNNING_EXECUTION).when(taskInstance).getState();
+
+ // Act
+ dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
+
+ // Assert
+ assertTrue(dispatcher.queueSize() > 0);
+ }
+
+ @Test
+ void start_DispatcherStartsSuccessfully() {
+ // Act
+ dispatcher.start();
+
+ // Assert
+ assertTrue(dispatcher.isAlive());
+ }
+
+ @Test
+ void dispatch_TaskDispatchedSuccessfully() throws TaskDispatchException {
+ // Arrange
+ ITaskExecutionRunnable taskExecutionRunnable =
mock(ITaskExecutionRunnable.class);
+ TaskInstance taskInstance = mock(TaskInstance.class);
+ when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
+ doNothing().when(taskExecutorClient).dispatch(any());
+ dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
+
+ // Act
+ dispatcher.start();
+ sleep(100); // Give some time for the dispatcher to run
+ dispatcher.close();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(1)).untilAsserted(
+ () -> verify(taskExecutorClient,
atLeastOnce()).dispatch(taskExecutionRunnable));
+ // Assert
+
+ }
+
+ @Test
+ void dispatch_TaskDispatchFails_RetryLogicWorks() throws
TaskDispatchException {
+ // Arrange
+ ITaskExecutionRunnable taskExecutionRunnable =
mock(ITaskExecutionRunnable.class);
+ TaskInstance taskInstance = mock(TaskInstance.class);
+ when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
+ doThrow(new
RuntimeException()).when(taskExecutorClient).dispatch(any());
+ dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
+
+ // Act
+ dispatcher.start();
+ sleep(100); // Give some time for the dispatcher to run
+ dispatcher.close();
+
+ // Assert
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(1)).untilAsserted(
+ () -> verify(taskExecutorClient,
atLeastOnce()).dispatch(any()));
+ }
+}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
new file mode 100644
index 0000000000..32aeda7f64
--- /dev/null
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.queue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class PriorityDelayQueueTest {
+
+ private PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> queue;
+ private ITaskExecutionRunnable taskExecutionRunnable;
+
+ @BeforeEach
+ public void setUp() {
+ queue = new PriorityDelayQueue<>();
+ taskExecutionRunnable = mock(ITaskExecutionRunnable.class);
+ }
+
+ @Test
+ public void testAdd() {
+ queue.add(new PriorityAndDelayBasedTaskEntry(1000,
taskExecutionRunnable));
+ assertEquals(1, queue.size());
+
+ queue.add(new PriorityAndDelayBasedTaskEntry(2000,
taskExecutionRunnable));
+ assertEquals(2, queue.size());
+ }
+
+ @Test
+ public void testTake() throws InterruptedException {
+ queue.add(new PriorityAndDelayBasedTaskEntry(1000,
taskExecutionRunnable));
+ PriorityAndDelayBasedTaskEntry entry = queue.take();
+ assertNotNull(entry);
+ assertEquals(0, queue.size());
+
+ }
+
+ @Test
+ public void testSize() {
+ assertEquals(0, queue.size());
+
+ queue.add(new PriorityAndDelayBasedTaskEntry(1000,
taskExecutionRunnable));
+ assertEquals(1, queue.size());
+ }
+
+ @Test
+ public void testClear() {
+ queue.add(new PriorityAndDelayBasedTaskEntry(1000,
taskExecutionRunnable));
+ queue.add(new PriorityAndDelayBasedTaskEntry(2000,
taskExecutionRunnable));
+ assertEquals(2, queue.size());
+
+ queue.clear();
+ assertEquals(0, queue.size());
+ }
+}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
new file mode 100644
index 0000000000..2d4292137b
--- /dev/null
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.queue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TimeBasedTaskExecutionRunnableComparableEntryTest {
+
+ private static final long TEST_DELAY_MILLS = 1000L;
+ private final String testData = "testData";
+ private TimeBasedTaskExecutionRunnableComparableEntry<String> entry;
+
+ @BeforeEach
+ public void setUp() {
+ entry = new
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
+ }
+
+ @Test
+ void constructor_NullData_ThrowsNullPointerException() {
+ try {
+ new
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, null);
+ fail("Expected NullPointerException to be thrown");
+ } catch (NullPointerException e) {
+ assertEquals("data is null", e.getMessage());
+ }
+ }
+
+ @Test
+ void getDelay_BeforeTriggerTime_ReturnsPositive() {
+ entry = new
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
+ Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(
+ () -> assertTrue(entry.getDelay(TimeUnit.MILLISECONDS) > 0));
+ }
+
+ @Test
+ void getDelay_AtTriggerTime_ReturnsZero() {
+ entry = new
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
+ Awaitility.await().atLeast(1000, TimeUnit.MILLISECONDS)
+ .with().pollInterval(1000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ long remainTime =
entry.getDelay(TimeUnit.MILLISECONDS);
+ // The allowable error is +-200
+ System.out.println("remainTime:" + remainTime);
+ assertTrue(Math.abs(remainTime) <= 200);
+ });
+ }
+
+ @Test
+ void getDelay_AfterTriggerTime_ReturnsNegative() {
+ entry = new
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
+ Awaitility.await().atMost(1500, TimeUnit.MILLISECONDS).untilAsserted(
+ () -> assertTrue(entry.getDelay(TimeUnit.MILLISECONDS) < 0));
+ }
+
+ @Test
+ void getDelay_DifferentTimeUnits_ReturnsCorrectValues() {
+ long remainTimeMillis = entry.getDelay(TimeUnit.MILLISECONDS);
+ long remainTimeSeconds = entry.getDelay(TimeUnit.SECONDS);
+
+ assertTrue(remainTimeSeconds <= remainTimeMillis / 1000);
+ }
+
+ @Test
+ void compareTo_SameObject_ReturnsZero() {
+ assertEquals(0, entry.compareTo(entry));
+ }
+}