This is an automated email from the ASF dual-hosted git repository.
gallardot pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f9b0e044c3 Retry TaskKillLifecycleEvent/TaskPauseLifecycleEvent when
task instance is submitted but already dispatched (#17203)
f9b0e044c3 is described below
commit f9b0e044c32a4426f87a4b6e0af18ed3e35d3860
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon May 26 16:50:51 2025 +0800
Retry TaskKillLifecycleEvent/TaskPauseLifecycleEvent when task instance is
submitted but already dispatched (#17203)
---
.../dolphinscheduler/eventbus/AbstractDelayEvent.java | 1 +
.../engine/task/dispatcher/WorkerGroupDispatcher.java | 4 ++++
.../task/lifecycle/event/TaskKillLifecycleEvent.java | 13 ++++++++++---
.../task/lifecycle/event/TaskPauseLifecycleEvent.java | 13 ++++++++++---
.../engine/task/statemachine/AbstractTaskStateAction.java | 4 ++--
.../engine/task/statemachine/TaskDispatchStateAction.java | 3 ---
.../engine/task/statemachine/TaskSubmittedStateAction.java | 14 +++++++++++---
.../task/executor/log/TaskExecutorMDCUtils.java | 3 +--
8 files changed, 39 insertions(+), 16 deletions(-)
diff --git
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
index c013171377..f046b0bac0 100644
---
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
+++
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
@@ -34,6 +34,7 @@ public abstract class AbstractDelayEvent implements IEvent,
Delayed {
private static final long DEFAULT_DELAY_TIME = 0;
+ // In milliseconds
protected long delayTime;
@Builder.Default
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
index d061f29aa8..ef909ed7f0 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
@@ -79,7 +80,10 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
try (
TaskExecutorMDCUtils.MDCAutoClosable ignore =
TaskExecutorMDCUtils.logWithMDC(taskExecutionRunnable.getId())) {
+
LogUtils.setWorkflowInstanceIdMDC(taskExecutionRunnable.getTaskInstance().getWorkflowInstanceId());
doDispatchTask(taskExecutionRunnable);
+ } finally {
+ LogUtils.removeWorkflowInstanceIdMDC();
}
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskKillLifecycleEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskKillLifecycleEvent.java
index 35acd4956a..568c6f5a18 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskKillLifecycleEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskKillLifecycleEvent.java
@@ -22,17 +22,24 @@ import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractT
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
-@AllArgsConstructor
public class TaskKillLifecycleEvent extends AbstractTaskLifecycleEvent {
private final ITaskExecutionRunnable taskExecutionRunnable;
+ private TaskKillLifecycleEvent(ITaskExecutionRunnable
taskExecutionRunnable, long delayTime) {
+ super(delayTime);
+ this.taskExecutionRunnable = taskExecutionRunnable;
+ }
+
public static TaskKillLifecycleEvent of(final ITaskExecutionRunnable
taskExecutionRunnable) {
- return new TaskKillLifecycleEvent(taskExecutionRunnable);
+ return of(taskExecutionRunnable, 0);
+ }
+
+ public static TaskKillLifecycleEvent of(final ITaskExecutionRunnable
taskExecutionRunnable, long delayTime) {
+ return new TaskKillLifecycleEvent(taskExecutionRunnable, delayTime);
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskPauseLifecycleEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskPauseLifecycleEvent.java
index 3e0ab2038c..beb8d724c7 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskPauseLifecycleEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskPauseLifecycleEvent.java
@@ -22,17 +22,24 @@ import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractT
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
-@AllArgsConstructor
public class TaskPauseLifecycleEvent extends AbstractTaskLifecycleEvent {
private final ITaskExecutionRunnable taskExecutionRunnable;
+ private TaskPauseLifecycleEvent(final ITaskExecutionRunnable
taskExecutionRunnable, long delayTime) {
+ super(delayTime);
+ this.taskExecutionRunnable = taskExecutionRunnable;
+ }
+
public static TaskPauseLifecycleEvent of(ITaskExecutionRunnable
taskExecutionRunnable) {
- return new TaskPauseLifecycleEvent(taskExecutionRunnable);
+ return of(taskExecutionRunnable, 0);
+ }
+
+ public static TaskPauseLifecycleEvent of(ITaskExecutionRunnable
taskExecutionRunnable, long delayTime) {
+ return new TaskPauseLifecycleEvent(taskExecutionRunnable, delayTime);
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index 3f79d71efa..52bc662c43 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -229,7 +229,7 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
protected void tryToDispatchTask(final ITaskExecutionRunnable
taskExecutionRunnable) {
if (isTaskNeedAcquireTaskGroupSlot(taskExecutionRunnable)) {
acquireTaskGroupSlot(taskExecutionRunnable);
- log.info("Task{} using taskGroup, success acquire taskGroup slot",
taskExecutionRunnable.getName());
+ log.info("Task[name={}] using taskGroup, success acquire taskGroup
slot", taskExecutionRunnable.getName());
return;
}
taskExecutionRunnable.getWorkflowEventBus().publish(TaskDispatchLifecycleEvent.of(taskExecutionRunnable));
@@ -262,7 +262,7 @@ public abstract class AbstractTaskStateAction implements
ITaskStateAction {
protected void logWarningIfCannotDoAction(final ITaskExecutionRunnable
taskExecutionRunnable,
final AbstractLifecycleEvent
event) {
final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
- log.warn("Task {} state is {} cannot do action on event: {}",
+ log.warn("Task[name={}] state is {} cannot do action on event: {}",
taskInstance.getName(),
taskInstance.getState(),
event);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskDispatchStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskDispatchStateAction.java
index acb31bb087..af18d97228 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskDispatchStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskDispatchStateAction.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.engine.task.statemachine;
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import
org.apache.dolphinscheduler.server.master.engine.task.client.TaskExecutorClient;
import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
@@ -44,8 +43,6 @@ import org.springframework.stereotype.Component;
@Component
public class TaskDispatchStateAction extends AbstractTaskStateAction {
- @Autowired
- private TaskInstanceDao taskInstanceDao;
@Autowired
private TaskExecutorClient taskExecutorClient;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
index e3a46f9f71..ba42fa6cc4 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
@@ -37,6 +37,8 @@ import
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.Tas
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
+import java.util.concurrent.TimeUnit;
+
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -128,7 +130,10 @@ public class TaskSubmittedStateAction extends
AbstractTaskStateAction {
taskExecutionRunnable.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecutionRunnable));
return;
}
- logWarningIfCannotDoAction(taskExecutionRunnable, taskPauseEvent);
+ log.info("The task[id={}] is submitted and already dispatched, cannot
pause, will try to pause it after 5s",
+ taskExecutionRunnable.getId());
+ taskExecutionRunnable.getWorkflowEventBus()
+ .publish(TaskPauseLifecycleEvent.of(taskExecutionRunnable,
TimeUnit.SECONDS.toSeconds(5)));
}
@Override
@@ -145,11 +150,14 @@ public class TaskSubmittedStateAction extends
AbstractTaskStateAction {
final TaskKillLifecycleEvent taskKillEvent) {
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
if
(workerGroupDispatcherCoordinator.removeTask(taskExecutionRunnable)) {
- log.info("Success kill task: {} before dispatch",
taskExecutionRunnable.getName());
+ log.info("Success kill task[id={}] before dispatch",
taskExecutionRunnable.getId());
taskExecutionRunnable.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecutionRunnable));
return;
}
- logWarningIfCannotDoAction(taskExecutionRunnable, taskKillEvent);
+ log.info("The task[id={}] is submitted and already dispatched, cannot
kill, will kill it after 5s",
+ taskExecutionRunnable.getId());
+ taskExecutionRunnable.getWorkflowEventBus()
+ .publish(TaskKillLifecycleEvent.of(taskExecutionRunnable,
TimeUnit.SECONDS.toSeconds(5)));
}
@Override
diff --git
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
index 4fc57137e9..cf964fad0d 100644
---
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
+++
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.task.executor.log;
-import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
import org.slf4j.MDC;
@@ -41,7 +40,7 @@ public class TaskExecutorMDCUtils {
MDC.put(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY, logPath);
}
- MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY,
String.valueOf(taskInstanceId));
+ MDC.put(TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
return () -> {
MDC.remove(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY);