This is an automated email from the ASF dual-hosted git repository.

gallardot pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f9b0e044c3 Retry TaskKillLifecycleEvent/TaskPauseLifecycleEvent when 
task instance is submitted but already dispatched (#17203)
f9b0e044c3 is described below

commit f9b0e044c32a4426f87a4b6e0af18ed3e35d3860
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon May 26 16:50:51 2025 +0800

    Retry TaskKillLifecycleEvent/TaskPauseLifecycleEvent when task instance is 
submitted but already dispatched (#17203)
---
 .../dolphinscheduler/eventbus/AbstractDelayEvent.java      |  1 +
 .../engine/task/dispatcher/WorkerGroupDispatcher.java      |  4 ++++
 .../task/lifecycle/event/TaskKillLifecycleEvent.java       | 13 ++++++++++---
 .../task/lifecycle/event/TaskPauseLifecycleEvent.java      | 13 ++++++++++---
 .../engine/task/statemachine/AbstractTaskStateAction.java  |  4 ++--
 .../engine/task/statemachine/TaskDispatchStateAction.java  |  3 ---
 .../engine/task/statemachine/TaskSubmittedStateAction.java | 14 +++++++++++---
 .../task/executor/log/TaskExecutorMDCUtils.java            |  3 +--
 8 files changed, 39 insertions(+), 16 deletions(-)

diff --git 
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
 
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
index c013171377..f046b0bac0 100644
--- 
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
+++ 
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java
@@ -34,6 +34,7 @@ public abstract class AbstractDelayEvent implements IEvent, 
Delayed {
 
     private static final long DEFAULT_DELAY_TIME = 0;
 
+    // In milliseconds
     protected long delayTime;
 
     @Builder.Default
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
index d061f29aa8..ef909ed7f0 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
 
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
@@ -79,7 +80,10 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
             try (
                     TaskExecutorMDCUtils.MDCAutoClosable ignore =
                             
TaskExecutorMDCUtils.logWithMDC(taskExecutionRunnable.getId())) {
+                
LogUtils.setWorkflowInstanceIdMDC(taskExecutionRunnable.getTaskInstance().getWorkflowInstanceId());
                 doDispatchTask(taskExecutionRunnable);
+            } finally {
+                LogUtils.removeWorkflowInstanceIdMDC();
             }
         }
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskKillLifecycleEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskKillLifecycleEvent.java
index 35acd4956a..568c6f5a18 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskKillLifecycleEvent.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskKillLifecycleEvent.java
@@ -22,17 +22,24 @@ import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractT
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 
 @Getter
-@AllArgsConstructor
 public class TaskKillLifecycleEvent extends AbstractTaskLifecycleEvent {
 
     private final ITaskExecutionRunnable taskExecutionRunnable;
 
+    private TaskKillLifecycleEvent(ITaskExecutionRunnable 
taskExecutionRunnable, long delayTime) {
+        super(delayTime);
+        this.taskExecutionRunnable = taskExecutionRunnable;
+    }
+
     public static TaskKillLifecycleEvent of(final ITaskExecutionRunnable 
taskExecutionRunnable) {
-        return new TaskKillLifecycleEvent(taskExecutionRunnable);
+        return of(taskExecutionRunnable, 0);
+    }
+
+    public static TaskKillLifecycleEvent of(final ITaskExecutionRunnable 
taskExecutionRunnable, long delayTime) {
+        return new TaskKillLifecycleEvent(taskExecutionRunnable, delayTime);
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskPauseLifecycleEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskPauseLifecycleEvent.java
index 3e0ab2038c..beb8d724c7 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskPauseLifecycleEvent.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskPauseLifecycleEvent.java
@@ -22,17 +22,24 @@ import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractT
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 
 @Getter
-@AllArgsConstructor
 public class TaskPauseLifecycleEvent extends AbstractTaskLifecycleEvent {
 
     private final ITaskExecutionRunnable taskExecutionRunnable;
 
+    private TaskPauseLifecycleEvent(final ITaskExecutionRunnable 
taskExecutionRunnable, long delayTime) {
+        super(delayTime);
+        this.taskExecutionRunnable = taskExecutionRunnable;
+    }
+
     public static TaskPauseLifecycleEvent of(ITaskExecutionRunnable 
taskExecutionRunnable) {
-        return new TaskPauseLifecycleEvent(taskExecutionRunnable);
+        return of(taskExecutionRunnable, 0);
+    }
+
+    public static TaskPauseLifecycleEvent of(ITaskExecutionRunnable 
taskExecutionRunnable, long delayTime) {
+        return new TaskPauseLifecycleEvent(taskExecutionRunnable, delayTime);
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index 3f79d71efa..52bc662c43 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -229,7 +229,7 @@ public abstract class AbstractTaskStateAction implements 
ITaskStateAction {
     protected void tryToDispatchTask(final ITaskExecutionRunnable 
taskExecutionRunnable) {
         if (isTaskNeedAcquireTaskGroupSlot(taskExecutionRunnable)) {
             acquireTaskGroupSlot(taskExecutionRunnable);
-            log.info("Task{} using taskGroup, success acquire taskGroup slot", 
taskExecutionRunnable.getName());
+            log.info("Task[name={}] using taskGroup, success acquire taskGroup 
slot", taskExecutionRunnable.getName());
             return;
         }
         
taskExecutionRunnable.getWorkflowEventBus().publish(TaskDispatchLifecycleEvent.of(taskExecutionRunnable));
@@ -262,7 +262,7 @@ public abstract class AbstractTaskStateAction implements 
ITaskStateAction {
     protected void logWarningIfCannotDoAction(final ITaskExecutionRunnable 
taskExecutionRunnable,
                                               final AbstractLifecycleEvent 
event) {
         final TaskInstance taskInstance = 
taskExecutionRunnable.getTaskInstance();
-        log.warn("Task {} state is {} cannot do action on event: {}",
+        log.warn("Task[name={}] state is {} cannot do action on event: {}",
                 taskInstance.getName(),
                 taskInstance.getState(),
                 event);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskDispatchStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskDispatchStateAction.java
index acb31bb087..af18d97228 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskDispatchStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskDispatchStateAction.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.engine.task.statemachine;
 
-import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import 
org.apache.dolphinscheduler.server.master.engine.task.client.TaskExecutorClient;
 import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
@@ -44,8 +43,6 @@ import org.springframework.stereotype.Component;
 @Component
 public class TaskDispatchStateAction extends AbstractTaskStateAction {
 
-    @Autowired
-    private TaskInstanceDao taskInstanceDao;
     @Autowired
     private TaskExecutorClient taskExecutorClient;
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
index e3a46f9f71..ba42fa6cc4 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java
@@ -37,6 +37,8 @@ import 
org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.Tas
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
 
+import java.util.concurrent.TimeUnit;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -128,7 +130,10 @@ public class TaskSubmittedStateAction extends 
AbstractTaskStateAction {
             
taskExecutionRunnable.getWorkflowEventBus().publish(TaskPausedLifecycleEvent.of(taskExecutionRunnable));
             return;
         }
-        logWarningIfCannotDoAction(taskExecutionRunnable, taskPauseEvent);
+        log.info("The task[id={}] is submitted and already dispatched, cannot 
pause, will try to pause it after 5s",
+                taskExecutionRunnable.getId());
+        taskExecutionRunnable.getWorkflowEventBus()
+                .publish(TaskPauseLifecycleEvent.of(taskExecutionRunnable, 
TimeUnit.SECONDS.toSeconds(5)));
     }
 
     @Override
@@ -145,11 +150,14 @@ public class TaskSubmittedStateAction extends 
AbstractTaskStateAction {
                                 final TaskKillLifecycleEvent taskKillEvent) {
         throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
         if 
(workerGroupDispatcherCoordinator.removeTask(taskExecutionRunnable)) {
-            log.info("Success kill task: {} before dispatch", 
taskExecutionRunnable.getName());
+            log.info("Success kill task[id={}] before dispatch", 
taskExecutionRunnable.getId());
             
taskExecutionRunnable.getWorkflowEventBus().publish(TaskKilledLifecycleEvent.of(taskExecutionRunnable));
             return;
         }
-        logWarningIfCannotDoAction(taskExecutionRunnable, taskKillEvent);
+        log.info("The task[id={}] is submitted and already dispatched, cannot 
kill, will kill it after 5s",
+                taskExecutionRunnable.getId());
+        taskExecutionRunnable.getWorkflowEventBus()
+                .publish(TaskKillLifecycleEvent.of(taskExecutionRunnable, 
TimeUnit.SECONDS.toSeconds(5)));
     }
 
     @Override
diff --git 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
index 4fc57137e9..cf964fad0d 100644
--- 
a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
+++ 
b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/log/TaskExecutorMDCUtils.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.task.executor.log;
 
-import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
 
 import org.slf4j.MDC;
@@ -41,7 +40,7 @@ public class TaskExecutorMDCUtils {
             MDC.put(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY, logPath);
         }
 
-        MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, 
String.valueOf(taskInstanceId));
+        MDC.put(TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
 
         return () -> {
             MDC.remove(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY);

Reply via email to