This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new bdec8b14f6 [Improvement-17179][Master] Remove
GlobalTaskDispatchWaitingQueue (#17180)
bdec8b14f6 is described below
commit bdec8b14f6bfda7c4ca21d5449957c2da941b243
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed May 14 23:15:37 2025 +0800
[Improvement-17179][Master] Remove GlobalTaskDispatchWaitingQueue (#17180)
---
.../server/master/MasterServer.java | 8 +-
.../server/master/engine/WorkflowEngine.java | 8 +-
.../task/dispatcher/WorkerGroupDispatcher.java} | 93 ++++++++------
.../WorkerGroupDispatcherCoordinator.java | 106 +++++++++++++++
.../statemachine/TaskSubmittedStateAction.java | 10 +-
.../runner/GlobalTaskDispatchWaitingQueue.java | 77 -----------
.../GlobalTaskDispatchWaitingQueueLooper.java | 110 ----------------
.../runner/WorkerGroupTaskDispatcherManager.java | 83 ------------
.../WorkerGroupDispatcherCoordinatorTest.java | 63 +++++++++
.../dispatcher/WorkerGroupDispatcherTest.java} | 91 +++++--------
.../GlobalTaskDispatchWaitingQueueLooperTest.java | 72 -----------
.../runner/GlobalTaskDispatchWaitingQueueTest.java | 143 ---------------------
.../WorkerGroupTaskDispatcherManagerTest.java | 92 -------------
.../task/executor/log/TaskExecutorMDCUtils.java | 8 +-
14 files changed, 273 insertions(+), 691 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 1f5bd646f6..f0dfdd70e2 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -38,10 +38,10 @@ import
org.apache.dolphinscheduler.server.master.engine.WorkflowEngine;
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
import
org.apache.dolphinscheduler.server.master.engine.system.SystemEventBusFireWorker;
import
org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent;
+import
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
-import
org.apache.dolphinscheduler.server.master.runner.WorkerGroupTaskDispatcherManager;
import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
import org.apache.dolphinscheduler.service.ServiceConfiguration;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -101,7 +101,7 @@ public class MasterServer implements IStoppable {
private MasterCoordinator masterCoordinator;
@Autowired
- private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
+ private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
public static void main(String[] args) {
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
@@ -189,8 +189,8 @@ public class MasterServer implements IStoppable {
// close spring Context and will invoke method with
@PreDestroy annotation to destroy beans.
// like
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
SpringApplicationContext closedSpringContext =
springApplicationContext;
- WorkerGroupTaskDispatcherManager
closeWorkerGroupTaskDispatcherManager =
- workerGroupTaskDispatcherManager) {
+ WorkerGroupDispatcherCoordinator
closeWorkerGroupDispatcherCoordinator =
+ workerGroupDispatcherCoordinator) {
log.info("MasterServer is stopping, current cause : {}", cause);
} catch (Exception e) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
index 6639fc553b..a0029b417a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.engine;
import org.apache.dolphinscheduler.server.master.engine.command.CommandEngine;
import
org.apache.dolphinscheduler.server.master.engine.executor.LogicTaskEngineDelegator;
-import
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueueLooper;
+import
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
import lombok.extern.slf4j.Slf4j;
@@ -37,7 +37,7 @@ public class WorkflowEngine implements AutoCloseable {
private CommandEngine commandEngine;
@Autowired
- private GlobalTaskDispatchWaitingQueueLooper
globalTaskDispatchWaitingQueueLooper;
+ private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
@Autowired
private LogicTaskEngineDelegator logicTaskEngineDelegator;
@@ -48,7 +48,7 @@ public class WorkflowEngine implements AutoCloseable {
commandEngine.start();
- globalTaskDispatchWaitingQueueLooper.start();
+ workerGroupDispatcherCoordinator.start();
logicTaskEngineDelegator.start();
@@ -60,7 +60,7 @@ public class WorkflowEngine implements AutoCloseable {
try (
final CommandEngine ignore1 = commandEngine;
final WorkflowEventBusCoordinator ignore2 =
workflowEventBusCoordinator;
- final GlobalTaskDispatchWaitingQueueLooper ignore3 =
globalTaskDispatchWaitingQueueLooper;
+ final WorkerGroupDispatcherCoordinator ignore3 =
workerGroupDispatcherCoordinator;
final LogicTaskEngineDelegator ignore5 =
logicTaskEngineDelegator) {
// closed the resource
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
similarity index 64%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
index 10991f068a..d061f29aa8 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner;
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
+import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
@@ -37,7 +38,7 @@ import lombok.extern.slf4j.Slf4j;
* 3. Ensuring thread safety and correct state transitions during task
processing.
*/
@Slf4j
-public class WorkerGroupTaskDispatcher extends BaseDaemonThread {
+public class WorkerGroupDispatcher extends BaseDaemonThread {
private final ITaskExecutorClient taskExecutorClient;
@@ -47,26 +48,16 @@ public class WorkerGroupTaskDispatcher extends
BaseDaemonThread {
// If it needs to be placed at the front of the queue, the queue needs to
be re-implemented.
private final
PriorityDelayQueue<PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable>>
workerGroupQueue;
+ private final Set<Integer> waitingDispatchTaskIds;
+
private final AtomicBoolean runningFlag = new AtomicBoolean(false);
- public WorkerGroupTaskDispatcher(String workerGroupName,
ITaskExecutorClient taskExecutorClient) {
+ public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient
taskExecutorClient) {
super("WorkerGroupTaskDispatcher-" + workerGroupName);
this.taskExecutorClient = taskExecutorClient;
this.workerGroupQueue = new PriorityDelayQueue<>();
- }
-
- /**
- * Adds a task to the worker group queue.
- * This method wraps the given task execution object into a priority and
delay-based task entry and adds it to the worker group queue.
- * The task is only added if the current dispatcher status is either
STARTED or INIT. If the dispatcher is in any other state,
- * the task addition will fail, and a warning message will be logged.
- *
- * @param taskExecutionRunnable The task execution object to add to the
queue, which implements the {@link ITaskExecutionRunnable} interface.
- * @param delayTimeMills The delay time in milliseconds before the task
should be executed.
- */
- public void addTaskToWorkerGroupQueue(ITaskExecutionRunnable
taskExecutionRunnable,
- long delayTimeMills) {
- workerGroupQueue.add(new
PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
+ this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
+ log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
}
@Override
@@ -80,27 +71,25 @@ public class WorkerGroupTaskDispatcher extends
BaseDaemonThread {
}
}
- public synchronized void close() {
- log.info("The {} closed called but not implemented", this.getName());
- // todo WorkerGroupTaskDispatcher thread needs to be shut down after
the WorkerGroup is deleted.
- }
-
@Override
public void run() {
while (runningFlag.get()) {
- dispatch();
+ PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry =
workerGroupQueue.take();
+ ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
+ try (
+ TaskExecutorMDCUtils.MDCAutoClosable ignore =
+
TaskExecutorMDCUtils.logWithMDC(taskExecutionRunnable.getId())) {
+ doDispatchTask(taskExecutionRunnable);
+ }
}
}
- private void dispatch() {
- PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry =
workerGroupQueue.take();
- ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
- final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
+ private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
try {
- final TaskExecutionStatus taskStatus = taskInstance.getState();
- if (taskStatus != TaskExecutionStatus.SUBMITTED_SUCCESS
- && taskStatus != TaskExecutionStatus.DELAY_EXECUTION) {
- log.warn("The TaskInstance {} state is : {}, will not
dispatch", taskInstance.getName(), taskStatus);
+ if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId()))
{
+ log.info(
+ "The task: {} doesn't exist in
waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
+ taskExecutionRunnable.getId());
return;
}
taskExecutorClient.dispatch(taskExecutionRunnable);
@@ -110,16 +99,44 @@ public class WorkerGroupTaskDispatcher extends
BaseDaemonThread {
// the waiting time will increase multiple of times, but will not
exceed 60 seconds
long waitingTimeMills = Math.min(
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() *
1_000L, 60_000L);
- workerGroupQueue.add(new
PriorityAndDelayBasedTaskEntry<>(waitingTimeMills, taskExecutionRunnable));
- log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskInstance.getName(), waitingTimeMills, e);
+ dispatchTask(taskExecutionRunnable, waitingTimeMills);
+ log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskExecutionRunnable.getId(),
+ waitingTimeMills, e);
}
}
/**
- * ony use unit test
- * @return size
+ * Adds a task to the worker group queue.
+ * This method wraps the given task execution object into a priority and
delay-based task entry and adds it to the worker group queue.
+ * The task is only added if the current dispatcher status is either
STARTED or INIT. If the dispatcher is in any other state,
+ * the task addition will fail, and a warning message will be logged.
+ *
+ * @param taskExecutionRunnable The task execution object to add to the
queue, which implements the {@link ITaskExecutionRunnable} interface.
+ * @param delayTimeMills The delay time in milliseconds before the
task should be executed.
*/
- protected int queueSize() {
+ public void dispatchTask(final ITaskExecutionRunnable
taskExecutionRunnable, final long delayTimeMills) {
+ waitingDispatchTaskIds.add(taskExecutionRunnable.getId());
+ workerGroupQueue.add(new
PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
+ }
+
+ public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
+ return waitingDispatchTaskIds.remove(taskExecutionRunnable.getId());
+ }
+
+ public boolean existTask(ITaskExecutionRunnable taskExecutionRunnable) {
+ return waitingDispatchTaskIds.contains(taskExecutionRunnable.getId());
+ }
+
+ public synchronized void close() {
+ // todo WorkerGroupTaskDispatcher thread needs to be shut down after
the WorkerGroup is deleted.
+ if (runningFlag.compareAndSet(true, false)) {
+ log.info("WorkerGroupDispatcher {} closed", this.getName());
+ } else {
+ log.warn("The WorkerGroupDispatcher: {} doesn't started",
this.getName());
+ }
+ }
+
+ int queueSize() {
return this.workerGroupQueue.size();
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
new file mode 100644
index 0000000000..1683b86d5c
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
+
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * WorkerGroupTaskDispatcherManager is responsible for managing the task
dispatching for worker groups.
+ * It maintains a mapping of worker groups to their task dispatchers and
priority delay queues,
+ * and supports adding tasks, starting and stopping worker groups, as well as
cleaning up resources upon shutdown.
+ */
+@Component
+@Slf4j
+public class WorkerGroupDispatcherCoordinator implements AutoCloseable {
+
+ @Autowired
+ private ITaskExecutorClient taskExecutorClient;
+
+ private final ConcurrentHashMap<String, WorkerGroupDispatcher>
workerGroupDispatcherMap;
+
+ public WorkerGroupDispatcherCoordinator() {
+ workerGroupDispatcherMap = new ConcurrentHashMap<>();
+ }
+
+ public void start() {
+ log.info("WorkerGroupTaskDispatcherManager started...");
+ }
+
+ /**
+ * Dispatch task to the worker group with the specified remaining time.
+ */
+ public void dispatchTask(final ITaskExecutionRunnable
taskExecutionRunnable,
+ final long delayTimeMills) {
+ final String workerGroup =
taskExecutionRunnable.getTaskInstance().getWorkerGroup();
+
getOrCreateWorkerGroupDispatcher(workerGroup).dispatchTask(taskExecutionRunnable,
delayTimeMills);
+ log.info("Success add Task: {} to WorkerGroupDispatcher: {}",
taskExecutionRunnable.getId(), workerGroup);
+ }
+
+ /**
+ * Remove task from the dispatcher.
+ * <p> If the task doesn't exist in the dispatcher, it will return false,
this means the task might already be dispatched.
+ */
+ public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
+ final String workerGroup =
taskExecutionRunnable.getTaskInstance().getWorkerGroup();
+ boolean removed =
getOrCreateWorkerGroupDispatcher(workerGroup).removeTask(taskExecutionRunnable);
+ if (removed) {
+ log.info("Success removed Task: {} from WorkerGroupDispatcher: {}",
+ taskExecutionRunnable.getId(), workerGroup);
+ } else {
+ log.info("Failed to remove Task: {} from WorkerGroupDispatcher:
{}, this task has been dispatched",
+ taskExecutionRunnable.getId(), workerGroup);
+ }
+ return removed;
+ }
+
+ public boolean existWorkerGroup(String workerGroup) {
+ return workerGroupDispatcherMap.containsKey(workerGroup);
+ }
+
+ /**
+ * Stop all workerGroupTaskDispatchWaitingQueueLooper
+ */
+ @Override
+ public void close() throws Exception {
+ log.info("WorkerGroupDispatcherCoordinator closing");
+ for (WorkerGroupDispatcher workerGroupDispatcher :
workerGroupDispatcherMap.values()) {
+ try {
+ workerGroupDispatcher.close();
+ } catch (Exception e) {
+ log.error("close WorkerGroupDispatcher: {} error",
workerGroupDispatcher.getName(), e);
+ }
+ }
+ log.info("WorkerGroupDispatcherCoordinator closed...");
+ }
+
+ private WorkerGroupDispatcher getOrCreateWorkerGroupDispatcher(String
workerGroup) {
+ return workerGroupDispatcherMap.computeIfAbsent(workerGroup, wg -> {
+ WorkerGroupDispatcher workerGroupDispatcher = new
WorkerGroupDispatcher(wg, taskExecutorClient);
+ workerGroupDispatcher.start();
+ return workerGroupDispatcher;
+ });
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
index cc82c1030c..e3a46f9f71 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
@@ -35,7 +36,6 @@ import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.Tas
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
import lombok.extern.slf4j.Slf4j;
@@ -50,7 +50,7 @@ import org.springframework.stereotype.Component;
public class TaskSubmittedStateAction extends AbstractTaskStateAction {
@Autowired
- private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
+ private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
@Autowired
private TaskInstanceDao taskInstanceDao;
@@ -107,7 +107,7 @@ public class TaskSubmittedStateAction extends
AbstractTaskStateAction {
taskInstance.getDelayTime(),
remainTimeMills);
}
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
remainTimeMills);
+ workerGroupDispatcherCoordinator.dispatchTask(taskExecutionRunnable,
remainTimeMills);
}
@Override
@@ -123,7 +123,7 @@ public class TaskSubmittedStateAction extends
AbstractTaskStateAction {
final ITaskExecutionRunnable
taskExecutionRunnable,
final TaskPauseLifecycleEvent taskPauseEvent)
{
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
- if
(globalTaskDispatchWaitingQueue.markTaskExecutionRunnableRemoved(taskExecutionRunnable))
{
+ if
(workerGroupDispatcherCoordinator.removeTask(taskExecutionRunnable)) {
log.info("Success pause task: {} before dispatch",
taskExecutionRunnable.getName());
taskExecutionRunnable.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecutionRunnable));
return;
@@ -144,7 +144,7 @@ public class TaskSubmittedStateAction extends
AbstractTaskStateAction {
final ITaskExecutionRunnable
taskExecutionRunnable,
final TaskKillLifecycleEvent taskKillEvent) {
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
- if
(globalTaskDispatchWaitingQueue.markTaskExecutionRunnableRemoved(taskExecutionRunnable))
{
+ if
(workerGroupDispatcherCoordinator.removeTask(taskExecutionRunnable)) {
log.info("Success kill task: {} before dispatch",
taskExecutionRunnable.getName());
taskExecutionRunnable.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecutionRunnable));
return;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
deleted file mode 100644
index 2f07be37b4..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.queue.TimeBasedTaskExecutionRunnableComparableEntry;
-
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.PriorityBlockingQueue;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.stereotype.Component;
-
-/**
- * The class is used to store {@link ITaskExecutionRunnable} which needs to be
dispatched. The {@link ITaskExecutionRunnable}
- * will be stored in {@link DelayQueue}, if the {@link
ITaskExecutionRunnable}'s delay time is 0, then it will be
- * consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
- * <p>
- * The order of {@link ITaskExecutionRunnable} in the {@link DelayQueue} is
determined by {@link ITaskExecutionRunnable#compareTo}.
- */
-@Slf4j
-@Component
-public class GlobalTaskDispatchWaitingQueue {
-
- private final Set<Integer> waitingTaskInstanceIds =
ConcurrentHashMap.newKeySet();
-
- private final DelayQueue<TimeBasedTaskExecutionRunnableComparableEntry>
delayQueue =
- new DelayQueue<>();
-
- /**
- * Submit a {@link ITaskExecutionRunnable} with delay time, if the delay
time <= 0 then it can be consumed.
- */
- public synchronized void
dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable
taskExecutionRunnable,
- long
delayTimeMills) {
-
waitingTaskInstanceIds.add(taskExecutionRunnable.getTaskInstance().getId());
- delayQueue.add(new
TimeBasedTaskExecutionRunnableComparableEntry(delayTimeMills,
taskExecutionRunnable));
- }
-
- /**
- * Consume {@link ITaskExecutionRunnable} from the {@link
PriorityBlockingQueue}, only the delay time <= 0 can be consumed.
- */
- @SneakyThrows
- public ITaskExecutionRunnable takeTaskExecuteRunnable() {
- ITaskExecutionRunnable taskExecutionRunnable =
(ITaskExecutionRunnable) delayQueue.take().getData();
- while (!markTaskExecutionRunnableRemoved(taskExecutionRunnable)) {
- taskExecutionRunnable = (ITaskExecutionRunnable)
delayQueue.take().getData();
- }
- return taskExecutionRunnable;
- }
-
- public int getWaitingDispatchTaskNumber() {
- return waitingTaskInstanceIds.size();
- }
-
- public synchronized boolean
markTaskExecutionRunnableRemoved(ITaskExecutionRunnable taskExecutionRunnable) {
- return
waitingTaskInstanceIds.remove(taskExecutionRunnable.getTaskInstance().getId());
- }
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
deleted file mode 100644
index 33a9b51b8d..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread
implements AutoCloseable {
-
- @Autowired
- private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
-
- @Autowired
- private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
-
- private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
-
- public GlobalTaskDispatchWaitingQueueLooper() {
- super("GlobalTaskDispatchWaitingQueueLooper");
- }
-
- @Override
- public synchronized void start() {
- if (!RUNNING_FLAG.compareAndSet(false, true)) {
- log.error("The GlobalTaskDispatchWaitingQueueLooper already
started, will not start again");
- return;
- }
- log.info("GlobalTaskDispatchWaitingQueueLooper starting...");
- super.start();
- log.info("GlobalTaskDispatchWaitingQueueLooper started...");
- }
-
- @Override
- public void run() {
- while (RUNNING_FLAG.get()) {
- doDispatch();
- }
- }
-
- void doDispatch() {
- final ITaskExecutionRunnable taskExecutionRunnable =
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
- final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
- try {
- final TaskExecutionStatus status = taskInstance.getState();
- if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status !=
TaskExecutionStatus.DELAY_EXECUTION) {
- log.warn("The TaskInstance {} state is : {}, will not
dispatch", taskInstance.getName(), status);
- return;
- }
- this.dispatchTaskToWorkerGroup(taskExecutionRunnable);
- } catch (Exception e) {
- this.delayRetryDispatch(taskExecutionRunnable, e);
- }
- }
-
- private void delayRetryDispatch(ITaskExecutionRunnable
taskExecutionRunnable, Exception e) {
- // If dispatch failed, will put the task back to the queue
- // The task will be dispatched after waiting time.
- // the waiting time will increase multiple of times, but will not
exceed 60 seconds
- long waitingTimeMills = Math.min(
-
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() *
1_000L, 60_000L);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
- waitingTimeMills);
- log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskExecutionRunnable.getTaskInstance().getName(),
- waitingTimeMills, e);
- }
-
- private void dispatchTaskToWorkerGroup(ITaskExecutionRunnable
taskExecutionRunnable) {
- workerGroupTaskDispatcherManager.addTaskToWorkerGroup(
- taskExecutionRunnable.getTaskInstance().getWorkerGroup(),
- taskExecutionRunnable, 0);
- }
-
- @Override
- public void close() throws Exception {
- if (RUNNING_FLAG.compareAndSet(true, false)) {
- log.info("GlobalTaskDispatchWaitingQueueLooper stopping...");
- workerGroupTaskDispatcherManager.close();
- log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");
- } else {
- log.error("GlobalTaskDispatchWaitingQueueLooper is not started");
- }
- }
-
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
deleted file mode 100644
index 456f2cd94c..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * WorkerGroupTaskDispatcherManager is responsible for managing the task
dispatching for worker groups.
- * It maintains a mapping of worker groups to their task dispatchers and
priority delay queues,
- * and supports adding tasks, starting and stopping worker groups, as well as
cleaning up resources upon shutdown.
- */
-@Component
-@Slf4j
-public class WorkerGroupTaskDispatcherManager implements AutoCloseable {
-
- @Autowired
- private ITaskExecutorClient taskExecutorClient;
-
- @Getter
- private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher>
dispatchWorkerMap;
-
- public WorkerGroupTaskDispatcherManager() {
- dispatchWorkerMap = new ConcurrentHashMap<>();
- }
-
- /**
- * Adds a task to the specified worker group queue and starts or wakes up
the corresponding processing loop.
- *
- * @param workerGroup the identifier for the worker group, used to
distinguish different task queues
- * @param taskExecutionRunnable an instance of ITaskExecutionRunnable
representing the task to be executed
- * @param delayTimeMills the delay time before the task is executed, in
milliseconds
- */
- public synchronized void addTaskToWorkerGroup(String workerGroup,
ITaskExecutionRunnable taskExecutionRunnable,
- long delayTimeMills) {
- WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.computeIfAbsent(
- workerGroup, key -> new WorkerGroupTaskDispatcher(workerGroup,
taskExecutorClient));
- if (!workerGroupTaskDispatcher.isAlive()) {
- workerGroupTaskDispatcher.start();
- }
-
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable,
delayTimeMills);
- }
-
- /**
- * Stop all workerGroupTaskDispatchWaitingQueueLooper
- */
- @Override
- public void close() throws Exception {
- log.info("WorkerGroupTaskDispatcherManager start close");
- for (Map.Entry<String, WorkerGroupTaskDispatcher> entry :
dispatchWorkerMap.entrySet()) {
- try {
- entry.getValue().close();
- } catch (Exception e) {
- log.error("close worker group error", e);
- }
- }
- log.info("WorkerGroupTaskDispatcherManager closed");
- }
-}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
new file mode 100644
index 0000000000..9d39de3a20
--- /dev/null
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class WorkerGroupDispatcherCoordinatorTest {
+
+ @InjectMocks
+ private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
+
+ @Mock
+ private ITaskExecutorClient taskExecutorClient;
+
+ @Test
+ void addTaskToWorkerGroup_NewWorkerGroup_ShouldAddTask() {
+ String workerGroup = "newGroup";
+ long delayTimeMills = 1000;
+
+ ITaskExecutionRunnable taskExecutionRunnable =
Mockito.mock(ITaskExecutionRunnable.class);
+ TaskInstance taskInstance = mock(TaskInstance.class);
+
+ when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+ when(taskInstance.getWorkerGroup()).thenReturn(workerGroup);
+
+
assertFalse(workerGroupDispatcherCoordinator.existWorkerGroup(workerGroup));
+
+ workerGroupDispatcherCoordinator.dispatchTask(taskExecutionRunnable,
delayTimeMills);
+
+
assertTrue(workerGroupDispatcherCoordinator.existWorkerGroup(workerGroup));
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
similarity index 53%
rename from
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
rename to
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
index a4be0d4213..7f5b0f4c8c 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
@@ -15,102 +15,77 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner;
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
-import static org.apache.dolphinscheduler.common.thread.ThreadUtils.sleep;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
import java.time.Duration;
-import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-class WorkerGroupTaskDispatcherTest {
+class WorkerGroupDispatcherTest {
- private WorkerGroupTaskDispatcher dispatcher;
+ private WorkerGroupDispatcher dispatcher;
private ITaskExecutorClient taskExecutorClient;
@BeforeEach
void setUp() {
taskExecutorClient = mock(ITaskExecutorClient.class);
- dispatcher = new WorkerGroupTaskDispatcher("TestGroup",
taskExecutorClient);
+ dispatcher = new WorkerGroupDispatcher("TestGroup",
taskExecutorClient);
}
@Test
- void addTaskToWorkerGroupQueue_StatusAllowed_TaskAdded() {
- // Arrange
+ void dispatchTask() {
ITaskExecutionRunnable taskExecutionRunnable =
mock(ITaskExecutionRunnable.class);
TaskInstance taskInstance = mock(TaskInstance.class);
when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
-
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
dispatcher.start();
- // Act
- dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
-
- // Assert
- assertFalse(dispatcher.queueSize() == 0);
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+ await()
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> verify(taskExecutorClient,
times(1)).dispatch(taskExecutionRunnable));
}
@Test
- void addTaskToWorkerGroupQueue_StatusNotAllowed_TaskNotAdded() {
- // Arrange
+ void dispatchTask_withDelay() {
ITaskExecutionRunnable taskExecutionRunnable =
mock(ITaskExecutionRunnable.class);
TaskInstance taskInstance = mock(TaskInstance.class);
when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
-
doReturn(TaskExecutionStatus.RUNNING_EXECUTION).when(taskInstance).getState();
-
- // Act
- dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
-
- // Assert
- assertTrue(dispatcher.queueSize() > 0);
- }
-
- @Test
- void start_DispatcherStartsSuccessfully() {
- // Act
dispatcher.start();
- // Assert
- assertTrue(dispatcher.isAlive());
+ dispatcher.dispatchTask(taskExecutionRunnable, 2000);
+ await()
+ .atLeast(Duration.ofMillis(1500))
+ .untilAsserted(() -> verify(taskExecutorClient,
times(1)).dispatch(taskExecutionRunnable));
}
@Test
- void dispatch_TaskDispatchedSuccessfully() throws TaskDispatchException {
- // Arrange
+ void dispatchTask_HasBeenRemoved() {
ITaskExecutionRunnable taskExecutionRunnable =
mock(ITaskExecutionRunnable.class);
TaskInstance taskInstance = mock(TaskInstance.class);
when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
-
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
- doNothing().when(taskExecutorClient).dispatch(any());
- dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
- // Act
- dispatcher.start();
- sleep(100); // Give some time for the dispatcher to run
- dispatcher.close();
- Awaitility.await()
- .atMost(Duration.ofSeconds(1)).untilAsserted(
- () -> verify(taskExecutorClient,
atLeastOnce()).dispatch(taskExecutionRunnable));
- // Assert
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+ dispatcher.removeTask(taskExecutionRunnable);
+ dispatcher.start();
+ await()
+ .pollDelay(Duration.ofSeconds(2))
+ .untilAsserted(() -> verify(taskExecutorClient,
times(0)).dispatch(taskExecutionRunnable));
}
@Test
@@ -119,18 +94,16 @@ class WorkerGroupTaskDispatcherTest {
ITaskExecutionRunnable taskExecutionRunnable =
mock(ITaskExecutionRunnable.class);
TaskInstance taskInstance = mock(TaskInstance.class);
when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
-
doReturn(TaskExecutionStatus.SUBMITTED_SUCCESS).when(taskInstance).getState();
- doThrow(new
RuntimeException()).when(taskExecutorClient).dispatch(any());
- dispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable, 0);
+ when(taskExecutionRunnable.getTaskExecutionContext()).thenReturn(new
TaskExecutionContext());
- // Act
+ doThrow(new
RuntimeException()).when(taskExecutorClient).dispatch(any());
dispatcher.start();
- sleep(100); // Give some time for the dispatcher to run
- dispatcher.close();
- // Assert
- Awaitility.await()
- .atMost(Duration.ofSeconds(1)).untilAsserted(
- () -> verify(taskExecutorClient,
atLeastOnce()).dispatch(any()));
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ await()
+ .pollDelay(Duration.ofSeconds(2))
+ .untilAsserted(() -> verify(taskExecutorClient,
times(2)).dispatch(taskExecutionRunnable));
}
+
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
deleted file mode 100644
index 0cc5571f01..0000000000
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooperTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.LENIENT)
-class GlobalTaskDispatchWaitingQueueLooperTest {
-
- @InjectMocks
- private GlobalTaskDispatchWaitingQueueLooper
globalTaskDispatchWaitingQueueLooper;
-
- @Mock
- private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
-
- @Mock
- private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
-
- @Mock
- private ITaskExecutionRunnable taskExecutionRunnable;
-
- @Test
- void testTaskExecutionRunnableStatusIsSubmittedNoWorkerGroup() {
-
-
when(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).thenReturn(taskExecutionRunnable);
- TaskInstance taskInstance = mock(TaskInstance.class);
-
when(taskInstance.getState()).thenReturn(TaskExecutionStatus.SUBMITTED_SUCCESS);
- when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
-
when(taskExecutionRunnable.getTaskExecutionContext()).thenReturn(mock(TaskExecutionContext.class));
- globalTaskDispatchWaitingQueueLooper.doDispatch();
-
- verify(workerGroupTaskDispatcherManager,
times(1)).addTaskToWorkerGroup(any(),
- any(ITaskExecutionRunnable.class),
- anyLong());
-
- }
-
-}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
deleted file mode 100644
index 209c1b4376..0000000000
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
-import
org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
-
-import org.apache.commons.lang3.RandomUtils;
-
-import java.time.Duration;
-import java.util.Date;
-import java.util.concurrent.CompletableFuture;
-
-import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionTimeoutException;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.springframework.context.ApplicationContext;
-
-@ExtendWith(MockitoExtension.class)
-class GlobalTaskDispatchWaitingQueueTest {
-
- @InjectMocks
- private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
-
- @Test
- void submitTaskExecuteRunnable() {
- ITaskExecutionRunnable iTaskExecutionRunnable =
createTaskExecuteRunnable();
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable,
500);
- Awaitility.await()
- .atMost(Duration.ofSeconds(1))
- .untilAsserted(
- () ->
Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()));
- }
-
- @Test
- void testSubmitTaskExecuteRunnableWithDelay() {
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(createTaskExecuteRunnable(),
3_000L);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(createTaskExecuteRunnable(),
500);
-
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull();
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull();
- }
-
- @Test
- void takeTaskExecuteRunnable_NoElementShouldBlock() {
- CompletableFuture<Void> completableFuture =
- CompletableFuture.runAsync(() ->
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable());
- assertThrowsExactly(ConditionTimeoutException.class,
- () -> await()
- .atLeast(Duration.ofSeconds(2))
- .timeout(Duration.ofSeconds(3))
- .until(completableFuture::isDone));
- }
-
- @Test
- void takeTaskExecuteRunnable_withDifferentTaskInstanceDelay() {
- ITaskExecutionRunnable taskExecutionRunnable1 =
createTaskExecuteRunnable();
- taskExecutionRunnable1.getTaskInstance().setId(1);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable1,
0);
-
- ITaskExecutionRunnable iTaskExecutionRunnable2 =
createTaskExecuteRunnable();
- iTaskExecutionRunnable2.getTaskInstance().setId(2);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable2,
1);
-
- ITaskExecutionRunnable iTaskExecutionRunnable3 =
createTaskExecuteRunnable();
- iTaskExecutionRunnable3.getTaskInstance().setId(3);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable3,
2);
-
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
- .isEqualTo(1);
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
- .isEqualTo(2);
-
assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId())
- .isEqualTo(3);
- }
-
- @Test
- void getWaitingDispatchTaskNumber() {
- Assertions.assertEquals(0,
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
- ITaskExecutionRunnable iTaskExecutionRunnable =
createTaskExecuteRunnable();
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable,
500);
- Assertions.assertEquals(1,
globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber());
- }
-
- private ITaskExecutionRunnable createTaskExecuteRunnable() {
- WorkflowInstance workflowInstance = new WorkflowInstance();
- workflowInstance.setWorkflowInstancePriority(Priority.MEDIUM);
-
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(RandomUtils.nextInt());
- taskInstance.setTaskInstancePriority(Priority.MEDIUM);
- taskInstance.setFirstSubmitTime(new Date());
-
- final ApplicationContext applicationContext =
mock(ApplicationContext.class);
- when(applicationContext.getBean(TaskExecutionContextFactory.class))
- .thenReturn(mock(TaskExecutionContextFactory.class));
- final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder =
TaskExecutionRunnableBuilder.builder()
- .applicationContext(applicationContext)
- .workflowInstance(workflowInstance)
- .taskInstance(taskInstance)
- .workflowExecutionGraph(new WorkflowExecutionGraph())
- .workflowDefinition(new WorkflowDefinition())
- .project(new Project())
- .taskDefinition(new TaskDefinition())
- .workflowEventBus(new WorkflowEventBus())
- .build();
- return new TaskExecutionRunnable(taskExecutionRunnableBuilder);
- }
-}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
deleted file mode 100644
index 6bfe96e4b5..0000000000
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManagerTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.springframework.test.util.ReflectionTestUtils;
-
-@ExtendWith(MockitoExtension.class)
-class WorkerGroupTaskDispatcherManagerTest {
-
- @InjectMocks
- private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager;
-
- @Mock
- private ITaskExecutionRunnable taskExecutionRunnable;
-
- @Test
- void addTaskToWorkerGroup_NewWorkerGroup_ShouldAddTask() {
- String workerGroup = "newGroup";
- long delayTimeMills = 1000;
-
- workerGroupTaskDispatcherManager.addTaskToWorkerGroup(workerGroup,
taskExecutionRunnable, delayTimeMills);
-
- ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap
=
- workerGroupTaskDispatcherManager.getDispatchWorkerMap();
-
- assertTrue(dispatchWorkerMap.containsKey(workerGroup));
- }
-
- @Test
- void addTaskToWorkerGroup_ExistingWorkerGroup_ShouldAddTask() {
- String workerGroup = "existingGroup";
- long delayTimeMills = 1000;
-
- WorkerGroupTaskDispatcher mockDispatcher =
mock(WorkerGroupTaskDispatcher.class);
-
- ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap
= new ConcurrentHashMap<>();
- dispatchWorkerMap.put(workerGroup, mockDispatcher);
-
- ReflectionTestUtils.setField(workerGroupTaskDispatcherManager,
"dispatchWorkerMap", dispatchWorkerMap);
- doNothing().when(mockDispatcher).start();
- workerGroupTaskDispatcherManager.addTaskToWorkerGroup(workerGroup,
taskExecutionRunnable, delayTimeMills);
-
- verify(mockDispatcher,
times(1)).addTaskToWorkerGroupQueue(taskExecutionRunnable, delayTimeMills);
- }
-
- @Test
- void close_ShouldCloseAllWorkerGroups() throws Exception {
- WorkerGroupTaskDispatcher mockDispatcher1 =
mock(WorkerGroupTaskDispatcher.class);
- WorkerGroupTaskDispatcher mockDispatcher2 =
mock(WorkerGroupTaskDispatcher.class);
-
- ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap
= new ConcurrentHashMap<>();
- dispatchWorkerMap.put("group1", mockDispatcher1);
- dispatchWorkerMap.put("group2", mockDispatcher2);
-
- ReflectionTestUtils.setField(workerGroupTaskDispatcherManager,
"dispatchWorkerMap", dispatchWorkerMap);
-
- workerGroupTaskDispatcherManager.close();
-
- verify(mockDispatcher1, times(1)).close();
- verify(mockDispatcher2, times(1)).close();
- }
-}
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
index f0c855c36f..4fc57137e9 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
@@ -31,17 +31,17 @@ public class TaskExecutorMDCUtils {
return logWithMDC(taskExecutor.getId(),
taskExecutor.getTaskExecutionContext().getLogPath());
}
- public static MDCAutoClosable logWithMDC(final int taskExecutorId) {
- return logWithMDC(taskExecutorId, null);
+ public static MDCAutoClosable logWithMDC(final int taskInstanceId) {
+ return logWithMDC(taskInstanceId, null);
}
- public static MDCAutoClosable logWithMDC(final int taskExecutorId, final
String logPath) {
+ public static MDCAutoClosable logWithMDC(final int taskInstanceId, final
String logPath) {
if (logPath != null) {
MDC.put(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY, logPath);
}
- MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY,
String.valueOf(taskExecutorId));
+ MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY,
String.valueOf(taskInstanceId));
return () -> {
MDC.remove(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY);