This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5d63e417ba [Fix-17350] Fix issues of sub-workflow failover from 
different master server (#17352)
5d63e417ba is described below

commit 5d63e417ba17935f0bddb91366aefa46e6f20c99
Author: lile <[email protected]>
AuthorDate: Fri Jul 25 09:38:40 2025 +0800

    [Fix-17350] Fix issues of sub-workflow failover from different master 
server (#17352)
---
 .../common/enums/WorkflowExecutionStatus.java      |  16 +-
 .../dao/mapper/WorkflowInstanceMapper.java         |  10 -
 .../repository/impl/WorkflowInstanceDaoImpl.java   |   2 +-
 .../dao/mapper/WorkflowInstanceMapper.xml          |  16 -
 .../plugin/subworkflow/SubWorkflowLogicTask.java   |   7 +-
 .../cases/WorkflowInstanceFailoverTestCase.java    | 477 ++++++++++++++++++++-
 ...h_sub_workflow_not_running_in_diff_master.yaml} | 150 ++++++-
 ...h_sub_workflow_not_running_in_diff_master.yaml} | 150 ++++++-
 ...h_sub_workflow_not_running_in_diff_master.yaml} |  48 +--
 ...lowInstance_with_sub_workflow_task_running.yaml |   4 +
 ..._sub_workflow_task_running_in_diff_master.yaml} |   8 +-
 11 files changed, 774 insertions(+), 114 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
index 6d96b31bb6..1cf115b4a2 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.common.enums;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -94,8 +93,19 @@ public enum WorkflowExecutionStatus {
                 || this == SERIAL_WAIT;
     }
 
-    public boolean canFailover() {
-        return Arrays.stream(NEED_FAILOVER_STATES).anyMatch(x -> x == 
this.getCode());
+    /**
+     * status can be take over on sub-workflow
+     * @return bool
+     */
+    public boolean canTakeover() {
+        return this == RUNNING_EXECUTION
+                || this == READY_PAUSE
+                || this == PAUSE
+                || this == READY_STOP
+                || this == STOP
+                || this == FAILURE
+                || this == SUCCESS
+                || this == FAILOVER;
     }
 
     public boolean canDirectPauseInDB() {
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
index b7a8de4cec..dee3a559b5 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
@@ -56,16 +56,6 @@ public interface WorkflowInstanceMapper extends 
BaseMapper<WorkflowInstance> {
     List<WorkflowInstance> queryByHostAndStatus(@Param("host") String host,
                                                 @Param("states") int[] 
stateArray);
 
-    /**
-     * query workflow instance by host and stateArray which is not sub workflow
-     *
-     * @param host       host
-     * @param stateArray stateArray
-     * @return workflow instance list
-     */
-    List<WorkflowInstance> queryMainWorkflowByHostAndStatus(@Param("host") 
String host,
-                                                            @Param("states") 
int[] stateArray);
-
     /**
      * query workflow instance host by stateArray
      *
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
index 7ec792d864..cfc381ab8a 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
@@ -176,7 +176,7 @@ public class WorkflowInstanceDaoImpl extends 
BaseDao<WorkflowInstance, WorkflowI
 
     @Override
     public List<WorkflowInstance> queryNeedFailoverWorkflowInstances(String 
masterAddress) {
-        return mybatisMapper.queryMainWorkflowByHostAndStatus(masterAddress,
+        return mybatisMapper.queryByHostAndStatus(masterAddress,
                 
WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
     }
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
index 24ca24ab57..fc2fb6acc2 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
@@ -50,22 +50,6 @@
         </if>
         order by id asc
     </select>
-    <select id="queryMainWorkflowByHostAndStatus" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkflowInstance">
-        select
-        <include refid="baseSql"/>
-        from t_ds_workflow_instance
-        where is_sub_workflow=0
-        <if test="host != null and host != ''">
-            and host=#{host}
-        </if>
-        <if test="states != null and states.length != 0">
-            and state in
-            <foreach collection="states" item="i" open="(" close=")" 
separator=",">
-                #{i}
-            </foreach>
-        </if>
-        order by id asc
-    </select>
     <select id="queryNeedFailoverWorkflowInstanceHost" resultType="String">
         select distinct host
         from t_ds_workflow_instance
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
index a156760fb5..82c435583f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
@@ -40,7 +40,6 @@ import 
org.apache.dolphinscheduler.server.master.engine.executor.plugin.Abstract
 import 
org.apache.dolphinscheduler.server.master.engine.executor.plugin.ITaskParameterDeserializer;
 import 
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
 import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
-import org.apache.dolphinscheduler.server.master.failover.WorkflowFailover;
 import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
 import 
org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent;
 
@@ -168,9 +167,9 @@ public class SubWorkflowLogicTask extends 
AbstractLogicTask<SubWorkflowParameter
         final WorkflowInstance subWorkflowInstance = 
workflowInstanceDao.queryById(
                 subWorkflowLogicTaskRuntimeContext.getSubWorkflowInstanceId());
 
-        if (subWorkflowInstance != null && 
subWorkflowInstance.getState().canFailover()) {
-            // Only handle sub-workflow's fail-over in SubWorkflowLogicTask's 
fail-over
-            
applicationContext.getBean(WorkflowFailover.class).failoverWorkflow(subWorkflowInstance);
+        if (subWorkflowInstance != null && 
subWorkflowInstance.getState().canTakeover()) {
+            // Here we only need to take over the runtime context of 
sub-workflow,
+            // the sub-workflow will be failover by master-server when needed.
             return subWorkflowLogicTaskRuntimeContext;
         }
 
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
index 2967109ac9..ab6f751df4 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceFailoverTestCase.java
@@ -21,9 +21,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
 
 import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.ServerStatus;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
 import org.apache.dolphinscheduler.extract.base.client.Clients;
 import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
 import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
@@ -31,8 +33,11 @@ import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowI
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
 import 
org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
+import org.apache.dolphinscheduler.server.master.cluster.MasterServerMetadata;
 import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
 import 
org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent;
+import 
org.apache.dolphinscheduler.server.master.engine.system.event.MasterFailoverEvent;
+import org.apache.dolphinscheduler.server.master.failover.FailoverCoordinator;
 import 
org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
 
 import org.apache.commons.lang3.StringUtils;
@@ -49,6 +54,9 @@ public class WorkflowInstanceFailoverTestCase extends 
AbstractMasterIntegrationT
     @Autowired
     private SystemEventBus systemEventBus;
 
+    @Autowired
+    FailoverCoordinator failoverCoordinator;
+
     @Test
     public void testGlobalFailover_runningWorkflow_withSubmittedTasks() {
         final String yaml = 
"/it/failover/running_workflowInstance_with_one_submitted_fake_task.yaml";
@@ -633,46 +641,495 @@ public class WorkflowInstanceFailoverTestCase extends 
AbstractMasterIntegrationT
 
         await()
                 .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
                 .untilAsserted(() -> {
                     assertThat(repository.queryAllWorkflowInstance())
                             .hasSize(2)
-                            .anySatisfy(workflowInstance -> {
+                            .allSatisfy(workflowInstance -> {
                                 assertThat(workflowInstance.getState())
                                         
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
                             });
                 });
 
+        assertThat(repository.queryTaskInstance(mainWorkflow))
+                .hasSize(2)
+                .allSatisfy(taskInstance -> {
+                    assertThat(taskInstance.getState())
+                            .isEqualTo(taskInstance.getId() == 1 ? 
TaskExecutionStatus.NEED_FAULT_TOLERANCE
+                                    : TaskExecutionStatus.SUCCESS);
+                    assertThat(taskInstance.getName())
+                            .isEqualTo("sub_workflow_task");
+                });
+
+        assertThat(repository.queryTaskInstance(subWorkflow))
+                .hasSize(2)
+                .allSatisfy(taskInstance -> {
+                    assertThat(taskInstance.getState())
+                            .isEqualTo(taskInstance.getId() == 2 ? 
TaskExecutionStatus.NEED_FAULT_TOLERANCE
+                                    : TaskExecutionStatus.SUCCESS);
+                    assertThat(taskInstance.getName())
+                            .isEqualTo("fake_task_A");
+                });
+
+        assertThat(repository.queryAllTaskInstance()).hasSize(4);
+
+        masterContainer.assertAllResourceReleased();
+
+    }
+
+    @Test
+    public void 
testMasterFailover_runningWorkflow_takeOverSubWorkflowOnParentHealthy() {
+        final String yaml = 
"/it/failover/running_workflowInstance_with_sub_workflow_task_running_in_diff_master.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition mainWorkflow = context.getWorkflows().stream()
+                .filter(workflow -> 
workflow.getName().equals("workflow_with_one_sub_workflow_running")).findFirst()
+                .orElse(null);
+        final WorkflowDefinition subWorkflow = context.getWorkflows().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow_running")).findFirst().orElse(null);
+
+        final WorkflowInstance mainWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> workflow.getName()
+                        
.equals("workflow_with_one_sub_workflow_running-20250424180000000"))
+                .findFirst()
+                .orElse(null);
+        final WorkflowInstance subWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow_running-20250424180000000")).findFirst()
+                .orElse(null);
+
+        assertThat(mainWorkflow).isNotNull();
+        assertThat(subWorkflow).isNotNull();
+        assertThat(mainWorkflowInstance).isNotNull();
+        assertThat(subWorkflowInstance).isNotNull();
+
+        MasterServerMetadata masterServerMain = MasterServerMetadata.builder()
+                .cpuUsage(0.2)
+                .memoryUsage(0.4)
+                .serverStatus(ServerStatus.NORMAL)
+                .address(mainWorkflowInstance.getHost())
+                .build();
+        MasterServerMetadata masterServerSub = MasterServerMetadata.builder()
+                .cpuUsage(0.2)
+                .memoryUsage(0.4)
+                .serverStatus(ServerStatus.NORMAL)
+                .address(subWorkflowInstance.getHost())
+                .build();
+
+        // first start workflow to simulate the normal parent workflow
+        systemEventBus.publish(MasterFailoverEvent.of(masterServerMain, new 
Date(), 0));
+
+        final String mainMasterFailoverNodePath = 
RegistryUtils.getFailoveredNodePath(
+                masterServerMain.getAddress(),
+                masterServerMain.getServerStartupTime(),
+                masterServerMain.getProcessId());
+        // wait failover main-workflow
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    
assertThat(registryClient.exists(mainMasterFailoverNodePath)).isTrue();
+                });
+        // wait main-workflow started
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    assertThat(repository.queryWorkflowInstance(mainWorkflow))
+                            .hasSize(1)
+                            .allSatisfy(workflowInstance -> {
+                                assertThat(workflowInstance.getState())
+                                        
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
+                            });
+                });
+
+        // wait sub-workflow-task started
         await()
                 .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
                 .untilAsserted(() -> {
                     assertThat(repository.queryTaskInstance(mainWorkflow))
                             .hasSize(2)
-                            .anySatisfy(taskInstance -> {
+                            .allSatisfy(taskInstance -> {
                                 assertThat(taskInstance.getState())
                                         .isEqualTo(taskInstance.getId() == 1 ? 
TaskExecutionStatus.NEED_FAULT_TOLERANCE
-                                                : TaskExecutionStatus.SUCCESS);
+                                                : 
TaskExecutionStatus.RUNNING_EXECUTION);
                                 assertThat(taskInstance.getName())
                                         .isEqualTo("sub_workflow_task");
                             });
                 });
 
+        // failover sub-workflow
+        systemEventBus.publish(MasterFailoverEvent.of(masterServerSub, new 
Date(), 0));
+
         await()
                 .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
                 .untilAsserted(() -> {
-                    assertThat(repository.queryTaskInstance(subWorkflow))
+                    assertThat(repository.queryAllWorkflowInstance())
                             .hasSize(2)
-                            .anySatisfy(taskInstance -> {
-                                assertThat(taskInstance.getState())
-                                        .isEqualTo(taskInstance.getId() == 2 ? 
TaskExecutionStatus.NEED_FAULT_TOLERANCE
-                                                : TaskExecutionStatus.SUCCESS);
-                                assertThat(taskInstance.getName())
-                                        .isEqualTo("fake_task_A");
+                            .allSatisfy(workflowInstance -> {
+                                assertThat(workflowInstance.getState())
+                                        
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
                             });
                 });
 
+        assertThat(repository.queryTaskInstance(mainWorkflow))
+                .hasSize(2)
+                .allSatisfy(taskInstance -> {
+                    assertThat(taskInstance.getState())
+                            .isEqualTo(taskInstance.getId() == 1 ? 
TaskExecutionStatus.NEED_FAULT_TOLERANCE
+                                    : TaskExecutionStatus.SUCCESS);
+                    assertThat(taskInstance.getName())
+                            .isEqualTo("sub_workflow_task");
+                });
+
+        assertThat(repository.queryTaskInstance(subWorkflow))
+                .hasSize(2)
+                .allSatisfy(taskInstance -> {
+                    assertThat(taskInstance.getState())
+                            .isEqualTo(taskInstance.getId() == 2 ? 
TaskExecutionStatus.NEED_FAULT_TOLERANCE
+                                    : TaskExecutionStatus.SUCCESS);
+                    assertThat(taskInstance.getName())
+                            .isEqualTo("fake_task_A");
+                });
+
         assertThat(repository.queryAllTaskInstance()).hasSize(4);
 
         masterContainer.assertAllResourceReleased();
 
     }
+
+    @Test
+    public void 
testMasterFailover_runningWorkflow_takeOverSubWorkflowOnChildHealthy() {
+        final String yaml = 
"/it/failover/running_workflowInstance_with_sub_workflow_task_running_in_diff_master.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition mainWorkflow = context.getWorkflows().stream()
+                .filter(workflow -> 
workflow.getName().equals("workflow_with_one_sub_workflow_running")).findFirst()
+                .orElse(null);
+        final WorkflowDefinition subWorkflow = context.getWorkflows().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow_running")).findFirst().orElse(null);
+
+        final WorkflowInstance mainWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> workflow.getName()
+                        
.equals("workflow_with_one_sub_workflow_running-20250424180000000"))
+                .findFirst()
+                .orElse(null);
+        final WorkflowInstance subWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow_running-20250424180000000")).findFirst()
+                .orElse(null);
+
+        assertThat(mainWorkflow).isNotNull();
+        assertThat(subWorkflow).isNotNull();
+        assertThat(mainWorkflowInstance).isNotNull();
+        assertThat(subWorkflowInstance).isNotNull();
+
+        MasterServerMetadata masterServerMain = MasterServerMetadata.builder()
+                .cpuUsage(0.2)
+                .memoryUsage(0.4)
+                .serverStatus(ServerStatus.NORMAL)
+                .address(mainWorkflowInstance.getHost())
+                .build();
+        MasterServerMetadata masterServerSub = MasterServerMetadata.builder()
+                .cpuUsage(0.2)
+                .memoryUsage(0.4)
+                .serverStatus(ServerStatus.NORMAL)
+                .address(subWorkflowInstance.getHost())
+                .build();
+
+        // first start sub-workflow to simulate the normal child workflow
+        systemEventBus.publish(MasterFailoverEvent.of(masterServerSub, new 
Date(), 0));
+
+        final String subMasterFailoverNodePath = 
RegistryUtils.getFailoveredNodePath(
+                masterServerSub.getAddress(),
+                masterServerSub.getServerStartupTime(),
+                masterServerSub.getProcessId());
+        // wait failover sub-workflow
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    
assertThat(registryClient.exists(subMasterFailoverNodePath)).isTrue();
+                });
+
+        // failover main-workflow
+        systemEventBus.publish(MasterFailoverEvent.of(masterServerMain, new 
Date(), 0));
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    assertThat(repository.queryAllWorkflowInstance())
+                            .hasSize(2)
+                            .allSatisfy(workflowInstance -> {
+                                assertThat(workflowInstance.getState())
+                                        
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
+                            });
+                });
+
+        assertThat(repository.queryAllTaskInstance()).filteredOn(
+                taskInstance -> taskInstance.getId() > 2 && 
taskInstance.getState() == TaskExecutionStatus.SUCCESS)
+                .hasSize(2);
+
+        masterContainer.assertAllResourceReleased();
+    }
+
+    @Test
+    public void 
testMasterFailover_runningWorkflow_takeOverSubWorkflowOnChildNotHealthy() {
+        final String yaml = 
"/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition mainWorkflow = context.getWorkflows().stream()
+                .filter(workflow -> 
workflow.getName().equals("workflow_with_one_sub_workflows")).findFirst()
+                .orElse(null);
+        final WorkflowDefinition subWorkflow = context.getWorkflows().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow")).findFirst().orElse(null);
+
+        final WorkflowInstance mainWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("workflow_with_sub_workflow_running-20250424180000000"))
+                .findFirst()
+                .orElse(null);
+        final WorkflowInstance submittedSubWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow_submitted-20250424180000000")).findFirst()
+                .orElse(null);
+
+        assertThat(mainWorkflow).isNotNull();
+        assertThat(subWorkflow).isNotNull();
+        assertThat(mainWorkflowInstance).isNotNull();
+        assertThat(submittedSubWorkflowInstance).isNotNull();
+
+        MasterServerMetadata masterServerMain = MasterServerMetadata.builder()
+                .cpuUsage(0.2)
+                .memoryUsage(0.4)
+                .serverStatus(ServerStatus.NORMAL)
+                .address(mainWorkflowInstance.getHost())
+                .build();
+        MasterServerMetadata masterServerSub = MasterServerMetadata.builder()
+                .cpuUsage(0.2)
+                .memoryUsage(0.4)
+                .serverStatus(ServerStatus.NORMAL)
+                .address(submittedSubWorkflowInstance.getHost())
+                .build();
+
+        // first start workflow to simulate the normal parent workflow
+        systemEventBus.publish(MasterFailoverEvent.of(masterServerMain, new 
Date(), 0));
+
+        final String mainMasterFailoverNodePath = 
RegistryUtils.getFailoveredNodePath(
+                masterServerMain.getAddress(),
+                masterServerMain.getServerStartupTime(),
+                masterServerMain.getProcessId());
+        // wait failover main-workflow
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    
assertThat(registryClient.exists(mainMasterFailoverNodePath)).isTrue();
+                });
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    assertThat(repository.queryWorkflowInstance(1).getState())
+                            .isEqualTo(WorkflowExecutionStatus.SUCCESS);
+                });
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    assertThat(repository.queryAllWorkflowInstance())
+                            .hasSize(3)
+                            .filteredOn(workflowInstance -> 
workflowInstance.getId() == 3)
+                            .allSatisfy(workflowInstance -> {
+                                assertThat(workflowInstance.getState())
+                                        
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
+                            });
+                });
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    assertThat(repository.queryAllTaskInstance())
+                            .hasSize(3)
+                            .filteredOn(taskInstance -> taskInstance.getId() > 
1)
+                            .allSatisfy(taskInstance -> {
+                                assertThat(taskInstance.getState())
+                                        
.isEqualTo(TaskExecutionStatus.SUCCESS);
+                            });
+                });
+
+        masterContainer.assertAllResourceReleased();
+
+    }
+
+    @Test
+    public void 
testMasterFailover_readyStopWorkflow_takeOverSubWorkflowOnChildNotHealthy() {
+        final String yaml = 
"/it/failover/readyStop_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition mainWorkflow = context.getWorkflows().stream()
+                .filter(workflow -> 
workflow.getName().equals("workflow_with_one_sub_workflows")).findFirst()
+                .orElse(null);
+        final WorkflowDefinition subWorkflow = context.getWorkflows().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow")).findFirst().orElse(null);
+
+        final WorkflowInstance mainWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("workflow_with_sub_workflow_running-20250424180000000"))
+                .findFirst()
+                .orElse(null);
+        final WorkflowInstance submittedSubWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow_submitted-20250424180000000")).findFirst()
+                .orElse(null);
+
+        final WorkflowInstance stopppedSubWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow_stopped-20250424180000000")).findFirst()
+                .orElse(null);
+
+        final WorkflowInstance pausedSubWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow_paused-20250424180000000")).findFirst()
+                .orElse(null);
+
+        assertThat(mainWorkflow).isNotNull();
+        assertThat(subWorkflow).isNotNull();
+        assertThat(mainWorkflowInstance).isNotNull();
+        assertThat(submittedSubWorkflowInstance).isNotNull();
+        assertThat(stopppedSubWorkflowInstance).isNotNull();
+        assertThat(pausedSubWorkflowInstance).isNotNull();
+
+        MasterServerMetadata masterServerMain = MasterServerMetadata.builder()
+                .cpuUsage(0.2)
+                .memoryUsage(0.4)
+                .serverStatus(ServerStatus.NORMAL)
+                .address(mainWorkflowInstance.getHost())
+                .build();
+        MasterServerMetadata masterServerSub = MasterServerMetadata.builder()
+                .cpuUsage(0.2)
+                .memoryUsage(0.4)
+                .serverStatus(ServerStatus.NORMAL)
+                .address(submittedSubWorkflowInstance.getHost())
+                .build();
+
+        systemEventBus.publish(MasterFailoverEvent.of(masterServerMain, new 
Date(), 0));
+        systemEventBus.publish(MasterFailoverEvent.of(masterServerSub, new 
Date(), 0));
+
+        final String mainMasterFailoverNodePath = 
RegistryUtils.getFailoveredNodePath(
+                masterServerMain.getAddress(),
+                masterServerMain.getServerStartupTime(),
+                masterServerMain.getProcessId());
+        // wait failover main-workflow
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    
assertThat(registryClient.exists(mainMasterFailoverNodePath)).isTrue();
+                });
+        // wait main-workflow stop
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    assertThat(repository.queryWorkflowInstance(mainWorkflow))
+                            .hasSize(1)
+                            .allSatisfy(workflowInstance -> {
+                                assertThat(workflowInstance.getState())
+                                        
.isEqualTo(WorkflowExecutionStatus.STOP);
+                            });
+                });
+
+        assertThat(repository.queryAllWorkflowInstance().size()).isEqualTo(4);
+        assertThat(repository.queryAllTaskInstance())
+                .hasSize(6)
+                .filteredOn(taskInstance -> taskInstance.getId() > 4)
+                .anySatisfy(taskInstance -> {
+                    assertThat(taskInstance.getState())
+                            .isEqualTo(TaskExecutionStatus.KILL);
+                });
+
+        masterContainer.assertAllResourceReleased();
+
+    }
+
+    @Test
+    public void 
testMasterFailover_readyPauseWorkflow_takeOverSubWorkflowOnChildNotHealthy() {
+        final String yaml =
+                
"/it/failover/readyPause_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition mainWorkflow = context.getWorkflows().stream()
+                .filter(workflow -> 
workflow.getName().equals("workflow_with_one_sub_workflows")).findFirst()
+                .orElse(null);
+        final WorkflowDefinition subWorkflow = context.getWorkflows().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow")).findFirst().orElse(null);
+
+        final WorkflowInstance mainWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("workflow_with_sub_workflow_running-20250424180000000"))
+                .findFirst()
+                .orElse(null);
+        final WorkflowInstance submittedSubWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow_submitted-20250424180000000")).findFirst()
+                .orElse(null);
+
+        final WorkflowInstance stopppedSubWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow_stopped-20250424180000000")).findFirst()
+                .orElse(null);
+
+        final WorkflowInstance pausedSubWorkflowInstance = 
context.getWorkflowInstances().stream()
+                .filter(workflow -> 
workflow.getName().equals("sub_workflow_paused-20250424180000000")).findFirst()
+                .orElse(null);
+
+        assertThat(mainWorkflow).isNotNull();
+        assertThat(subWorkflow).isNotNull();
+        assertThat(mainWorkflowInstance).isNotNull();
+        assertThat(submittedSubWorkflowInstance).isNotNull();
+        assertThat(stopppedSubWorkflowInstance).isNotNull();
+        assertThat(pausedSubWorkflowInstance).isNotNull();
+
+        MasterServerMetadata masterServerMain = MasterServerMetadata.builder()
+                .cpuUsage(0.2)
+                .memoryUsage(0.4)
+                .serverStatus(ServerStatus.NORMAL)
+                .address(mainWorkflowInstance.getHost())
+                .build();
+        MasterServerMetadata masterServerSub = MasterServerMetadata.builder()
+                .cpuUsage(0.2)
+                .memoryUsage(0.4)
+                .serverStatus(ServerStatus.NORMAL)
+                .address(submittedSubWorkflowInstance.getHost())
+                .build();
+
+        systemEventBus.publish(MasterFailoverEvent.of(masterServerMain, new 
Date(), 0));
+        systemEventBus.publish(MasterFailoverEvent.of(masterServerSub, new 
Date(), 0));
+
+        final String mainMasterFailoverNodePath = 
RegistryUtils.getFailoveredNodePath(
+                masterServerMain.getAddress(),
+                masterServerMain.getServerStartupTime(),
+                masterServerMain.getProcessId());
+        // wait failover main-workflow
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    
assertThat(registryClient.exists(mainMasterFailoverNodePath)).isTrue();
+                });
+        // wait main-workflow stop
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .pollInterval(Duration.ofMillis(500))
+                .untilAsserted(() -> {
+                    assertThat(repository.queryWorkflowInstance(mainWorkflow))
+                            .hasSize(1)
+                            .allSatisfy(workflowInstance -> {
+                                assertThat(workflowInstance.getState())
+                                        
.isEqualTo(WorkflowExecutionStatus.PAUSE);
+                            });
+                });
+
+        assertThat(repository.queryAllWorkflowInstance().size()).isEqualTo(4);
+        assertThat(repository.queryAllTaskInstance())
+                .hasSize(6)
+                .filteredOn(taskInstance -> taskInstance.getId() > 4)
+                .allSatisfy(taskInstance -> {
+                    assertThat(taskInstance.getState())
+                            .isEqualTo(TaskExecutionStatus.PAUSE);
+                });
+
+        masterContainer.assertAllResourceReleased();
+
+    }
 }
diff --git 
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
 
b/dolphinscheduler-master/src/test/resources/it/failover/readyPause_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
similarity index 53%
copy from 
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
copy to 
dolphinscheduler-master/src/test/resources/it/failover/readyPause_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
index 6d1e1e24ee..35b0231de3 100644
--- 
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
+++ 
b/dolphinscheduler-master/src/test/resources/it/failover/readyPause_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
@@ -25,7 +25,7 @@ project:
   updateTime: 2025-04-24 00:00:00
 
 workflows:
-  - name: workflow_with_one_sub_workflow_running
+  - name: workflow_with_one_sub_workflows
     code: 1
     version: 1
     projectCode: 1
@@ -35,7 +35,7 @@ workflows:
     updateTime: 2025-04-24 00:00:00
     userId: 1
     executionType: PARALLEL
-  - name: sub_workflow_running
+  - name: sub_workflow
     code: 2
     version: 1
     projectCode: 1
@@ -47,7 +47,7 @@ workflows:
     executionType: PARALLEL
 
 tasks:
-  - name: sub_workflow_task
+  - name: sub_workflow_task_submitted
     code: 1
     version: 1
     projectCode: 1
@@ -58,31 +58,57 @@ tasks:
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
     taskExecuteType: BATCH
-  - name: fake_task_A
+    taskPriority: MEDIUM
+  - name: sub_workflow_task_stopped
     code: 2
     version: 1
     projectCode: 1
     userId: 1
+    taskType: SUB_WORKFLOW
+    taskParams: 
'{"localParams":[],"resourceList":[],"workflowDefinitionCode":2}'
+    workerGroup: default
+    createTime: 2025-04-24 00:00:00
+    updateTime: 2025-04-24 00:00:00
+    taskExecuteType: BATCH
+    taskPriority: MEDIUM
+  - name: sub_workflow_task_paused
+    code: 3
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: SUB_WORKFLOW
+    taskParams: 
'{"localParams":[],"resourceList":[],"workflowDefinitionCode":2}'
+    workerGroup: default
+    createTime: 2025-04-24 00:00:00
+    updateTime: 2025-04-24 00:00:00
+    taskExecuteType: BATCH
+    taskPriority: MEDIUM
+  - name: fake_task_A
+    code: 4
+    version: 1
+    projectCode: 1
+    userId: 1
     taskType: LogicFakeTask
     taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls ."}'
     workerGroup: default
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
     taskExecuteType: BATCH
+    taskPriority: MEDIUM
 
 
 workflowInstances:
   - id: 1
-    name: workflow_with_one_sub_workflow_running-20250424180000000
+    name: workflow_with_sub_workflow_running-20250424180000000
     workflowDefinitionCode: 1
     workflowDefinitionVersion: 1
     projectCode: 1
-    state: RUNNING_EXECUTION
+    state: READY_PAUSE
     recovery: NO
     startTime: 2025-04-24 18:00:00
     endTime: null
     runTimes: 1
-    host: 127.0.0.1:5678
+    host: 1.2.3.4:5678
     commandType: START_PROCESS
     commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
     taskDependType: TASK_POST
@@ -95,16 +121,60 @@ workflowInstances:
     varPool: '[]'
     dryRun: 0
   - id: 2
-    name: sub_workflow_running-20250424180000000
+    name: sub_workflow_submitted-20250424180000000
     workflowDefinitionCode: 2
     workflowDefinitionVersion: 1
     projectCode: 1
-    state: RUNNING_EXECUTION
+    state: SUBMITTED_SUCCESS
+    recovery: NO
+    startTime: 2025-04-24 18:00:00
+    endTime: null
+    runTimes: 1
+    host: 5.6.7.8:5678
+    commandType: START_PROCESS
+    commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
+    taskDependType: TASK_POST
+    commandStartTime: 2025-04-24 18:00:00
+    isSubWorkflow: YES
+    executorId: 1
+    historyCmd: START_PROCESS
+    workerGroup: default
+    globalParams: '[]'
+    varPool: '[]'
+    dryRun: 0
+  - id: 3
+    name: sub_workflow_stopped-20250424180000000
+    workflowDefinitionCode: 2
+    workflowDefinitionVersion: 1
+    projectCode: 1
+    state: STOP
     recovery: NO
     startTime: 2025-04-24 18:00:00
     endTime: null
     runTimes: 1
-    host: 127.0.0.1:5678
+    host: 5.6.7.8:5678
+    commandType: START_PROCESS
+    commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
+    taskDependType: TASK_POST
+    commandStartTime: 2025-04-24 18:00:00
+    isSubWorkflow: YES
+    executorId: 1
+    historyCmd: START_PROCESS
+    workerGroup: default
+    globalParams: '[]'
+    varPool: '[]'
+    dryRun: 0
+  - id: 4
+    name: sub_workflow_paused-20250424180000000
+    workflowDefinitionCode: 2
+    workflowDefinitionVersion: 1
+    projectCode: 1
+    state: PAUSE
+    recovery: NO
+    startTime: 2025-04-24 18:00:00
+    endTime: null
+    runTimes: 1
+    host: 5.6.7.8:5678
     commandType: START_PROCESS
     commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
     taskDependType: TASK_POST
@@ -120,7 +190,7 @@ workflowInstances:
 
 taskInstances:
   - id: 1
-    name: sub_workflow_task
+    name: sub_workflow_task_submitted
     taskType: SUB_WORKFLOW
     workflowInstanceId: 1
     workflowInstanceName: 
workflow_with_one_sub_workflow_running-20240816071251690
@@ -143,13 +213,14 @@ taskInstances:
     varPool: '[]'
     taskExecuteType: BATCH
     appLink: '{"subWorkflowInstanceId":2}'
+    taskInstancePriority: MEDIUM
   - id: 2
-    name: fake_task_A
-    taskType: LogicFakeTask
-    workflowInstanceId: 2
+    name: sub_workflow_task_stopped
+    taskType: SUB_WORKFLOW
+    workflowInstanceId: 1
     workflowInstanceName: 
workflow_with_one_sub_workflow_running-20240816071251690
     projectCode: 1
-    taskCode: 2
+    taskCode: 1
     taskDefinitionVersion: 1
     state: RUNNING_EXECUTION
     firstSubmitTime: 2025-04-24 18:00:00
@@ -158,7 +229,7 @@ taskInstances:
     retryTimes: 0
     host: 127.0.0.1:1234
     maxRetryTimes: 0
-    taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls ."}'
+    taskParams: '{"localParams":null,"varPool":[],"workflowDefinitionCode":1}'
     flag: YES
     retryInterval: 0
     delayTime: 0
@@ -166,6 +237,33 @@ taskInstances:
     executorId: 1
     varPool: '[]'
     taskExecuteType: BATCH
+    appLink: '{"subWorkflowInstanceId":3}'
+    taskInstancePriority: MEDIUM
+  - id: 3
+    name: sub_workflow_task_paused
+    taskType: SUB_WORKFLOW
+    workflowInstanceId: 1
+    workflowInstanceName: 
workflow_with_one_sub_workflow_running-20240816071251690
+    projectCode: 1
+    taskCode: 1
+    taskDefinitionVersion: 1
+    state: RUNNING_EXECUTION
+    firstSubmitTime: 2025-04-24 18:00:00
+    submitTime: 2025-04-24 18:00:00
+    startTime: 2025-04-24 18:00:00
+    retryTimes: 0
+    host: 127.0.0.1:1234
+    maxRetryTimes: 0
+    taskParams: '{"localParams":null,"varPool":[],"workflowDefinitionCode":1}'
+    flag: YES
+    retryInterval: 0
+    delayTime: 0
+    workerGroup: default
+    executorId: 1
+    varPool: '[]'
+    taskExecuteType: BATCH
+    appLink: '{"subWorkflowInstanceId":4}'
+    taskInstancePriority: MEDIUM
 
 taskRelations:
   - projectCode: 1
@@ -178,7 +276,7 @@ taskRelations:
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
   - projectCode: 1
-    workflowDefinitionCode: 2
+    workflowDefinitionCode: 1
     workflowDefinitionVersion: 1
     preTaskCode: 0
     preTaskVersion: 0
@@ -186,3 +284,21 @@ taskRelations:
     postTaskVersion: 1
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 3
+    postTaskVersion: 1
+    createTime: 2025-04-24 00:00:00
+    updateTime: 2025-04-24 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 2
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 4
+    postTaskVersion: 1
+    createTime: 2025-04-24 00:00:00
+    updateTime: 2025-04-24 00:00:00
diff --git 
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
 
b/dolphinscheduler-master/src/test/resources/it/failover/readyStop_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
similarity index 53%
copy from 
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
copy to 
dolphinscheduler-master/src/test/resources/it/failover/readyStop_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
index 6d1e1e24ee..3d81ed4dc5 100644
--- 
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
+++ 
b/dolphinscheduler-master/src/test/resources/it/failover/readyStop_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
@@ -25,7 +25,7 @@ project:
   updateTime: 2025-04-24 00:00:00
 
 workflows:
-  - name: workflow_with_one_sub_workflow_running
+  - name: workflow_with_one_sub_workflows
     code: 1
     version: 1
     projectCode: 1
@@ -35,7 +35,7 @@ workflows:
     updateTime: 2025-04-24 00:00:00
     userId: 1
     executionType: PARALLEL
-  - name: sub_workflow_running
+  - name: sub_workflow
     code: 2
     version: 1
     projectCode: 1
@@ -47,7 +47,7 @@ workflows:
     executionType: PARALLEL
 
 tasks:
-  - name: sub_workflow_task
+  - name: sub_workflow_task_submitted
     code: 1
     version: 1
     projectCode: 1
@@ -58,31 +58,57 @@ tasks:
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
     taskExecuteType: BATCH
-  - name: fake_task_A
+    taskPriority: MEDIUM
+  - name: sub_workflow_task_stopped
     code: 2
     version: 1
     projectCode: 1
     userId: 1
+    taskType: SUB_WORKFLOW
+    taskParams: 
'{"localParams":[],"resourceList":[],"workflowDefinitionCode":2}'
+    workerGroup: default
+    createTime: 2025-04-24 00:00:00
+    updateTime: 2025-04-24 00:00:00
+    taskExecuteType: BATCH
+    taskPriority: MEDIUM
+  - name: sub_workflow_task_paused
+    code: 3
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: SUB_WORKFLOW
+    taskParams: 
'{"localParams":[],"resourceList":[],"workflowDefinitionCode":2}'
+    workerGroup: default
+    createTime: 2025-04-24 00:00:00
+    updateTime: 2025-04-24 00:00:00
+    taskExecuteType: BATCH
+    taskPriority: MEDIUM
+  - name: fake_task_A
+    code: 4
+    version: 1
+    projectCode: 1
+    userId: 1
     taskType: LogicFakeTask
     taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls ."}'
     workerGroup: default
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
     taskExecuteType: BATCH
+    taskPriority: MEDIUM
 
 
 workflowInstances:
   - id: 1
-    name: workflow_with_one_sub_workflow_running-20250424180000000
+    name: workflow_with_sub_workflow_running-20250424180000000
     workflowDefinitionCode: 1
     workflowDefinitionVersion: 1
     projectCode: 1
-    state: RUNNING_EXECUTION
+    state: READY_STOP
     recovery: NO
     startTime: 2025-04-24 18:00:00
     endTime: null
     runTimes: 1
-    host: 127.0.0.1:5678
+    host: 1.2.3.4:5678
     commandType: START_PROCESS
     commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
     taskDependType: TASK_POST
@@ -95,16 +121,60 @@ workflowInstances:
     varPool: '[]'
     dryRun: 0
   - id: 2
-    name: sub_workflow_running-20250424180000000
+    name: sub_workflow_submitted-20250424180000000
     workflowDefinitionCode: 2
     workflowDefinitionVersion: 1
     projectCode: 1
-    state: RUNNING_EXECUTION
+    state: SUBMITTED_SUCCESS
+    recovery: NO
+    startTime: 2025-04-24 18:00:00
+    endTime: null
+    runTimes: 1
+    host: 5.6.7.8:5678
+    commandType: START_PROCESS
+    commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
+    taskDependType: TASK_POST
+    commandStartTime: 2025-04-24 18:00:00
+    isSubWorkflow: YES
+    executorId: 1
+    historyCmd: START_PROCESS
+    workerGroup: default
+    globalParams: '[]'
+    varPool: '[]'
+    dryRun: 0
+  - id: 3
+    name: sub_workflow_stopped-20250424180000000
+    workflowDefinitionCode: 2
+    workflowDefinitionVersion: 1
+    projectCode: 1
+    state: STOP
     recovery: NO
     startTime: 2025-04-24 18:00:00
     endTime: null
     runTimes: 1
-    host: 127.0.0.1:5678
+    host: 5.6.7.8:5678
+    commandType: START_PROCESS
+    commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
+    taskDependType: TASK_POST
+    commandStartTime: 2025-04-24 18:00:00
+    isSubWorkflow: YES
+    executorId: 1
+    historyCmd: START_PROCESS
+    workerGroup: default
+    globalParams: '[]'
+    varPool: '[]'
+    dryRun: 0
+  - id: 4
+    name: sub_workflow_paused-20250424180000000
+    workflowDefinitionCode: 2
+    workflowDefinitionVersion: 1
+    projectCode: 1
+    state: PAUSE
+    recovery: NO
+    startTime: 2025-04-24 18:00:00
+    endTime: null
+    runTimes: 1
+    host: 5.6.7.8:5678
     commandType: START_PROCESS
     commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
     taskDependType: TASK_POST
@@ -120,7 +190,7 @@ workflowInstances:
 
 taskInstances:
   - id: 1
-    name: sub_workflow_task
+    name: sub_workflow_task_submitted
     taskType: SUB_WORKFLOW
     workflowInstanceId: 1
     workflowInstanceName: 
workflow_with_one_sub_workflow_running-20240816071251690
@@ -143,13 +213,14 @@ taskInstances:
     varPool: '[]'
     taskExecuteType: BATCH
     appLink: '{"subWorkflowInstanceId":2}'
+    taskInstancePriority: MEDIUM
   - id: 2
-    name: fake_task_A
-    taskType: LogicFakeTask
-    workflowInstanceId: 2
+    name: sub_workflow_task_stopped
+    taskType: SUB_WORKFLOW
+    workflowInstanceId: 1
     workflowInstanceName: 
workflow_with_one_sub_workflow_running-20240816071251690
     projectCode: 1
-    taskCode: 2
+    taskCode: 1
     taskDefinitionVersion: 1
     state: RUNNING_EXECUTION
     firstSubmitTime: 2025-04-24 18:00:00
@@ -158,7 +229,7 @@ taskInstances:
     retryTimes: 0
     host: 127.0.0.1:1234
     maxRetryTimes: 0
-    taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls ."}'
+    taskParams: '{"localParams":null,"varPool":[],"workflowDefinitionCode":1}'
     flag: YES
     retryInterval: 0
     delayTime: 0
@@ -166,6 +237,33 @@ taskInstances:
     executorId: 1
     varPool: '[]'
     taskExecuteType: BATCH
+    appLink: '{"subWorkflowInstanceId":3}'
+    taskInstancePriority: MEDIUM
+  - id: 3
+    name: sub_workflow_task_paused
+    taskType: SUB_WORKFLOW
+    workflowInstanceId: 1
+    workflowInstanceName: 
workflow_with_one_sub_workflow_running-20240816071251690
+    projectCode: 1
+    taskCode: 1
+    taskDefinitionVersion: 1
+    state: RUNNING_EXECUTION
+    firstSubmitTime: 2025-04-24 18:00:00
+    submitTime: 2025-04-24 18:00:00
+    startTime: 2025-04-24 18:00:00
+    retryTimes: 0
+    host: 127.0.0.1:1234
+    maxRetryTimes: 0
+    taskParams: '{"localParams":null,"varPool":[],"workflowDefinitionCode":1}'
+    flag: YES
+    retryInterval: 0
+    delayTime: 0
+    workerGroup: default
+    executorId: 1
+    varPool: '[]'
+    taskExecuteType: BATCH
+    appLink: '{"subWorkflowInstanceId":4}'
+    taskInstancePriority: MEDIUM
 
 taskRelations:
   - projectCode: 1
@@ -178,7 +276,7 @@ taskRelations:
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
   - projectCode: 1
-    workflowDefinitionCode: 2
+    workflowDefinitionCode: 1
     workflowDefinitionVersion: 1
     preTaskCode: 0
     preTaskVersion: 0
@@ -186,3 +284,21 @@ taskRelations:
     postTaskVersion: 1
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 3
+    postTaskVersion: 1
+    createTime: 2025-04-24 00:00:00
+    updateTime: 2025-04-24 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 2
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 4
+    postTaskVersion: 1
+    createTime: 2025-04-24 00:00:00
+    updateTime: 2025-04-24 00:00:00
diff --git 
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
 
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
similarity index 78%
copy from 
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
copy to 
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
index 6d1e1e24ee..a97c4f4e45 100644
--- 
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
+++ 
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_not_running_in_diff_master.yaml
@@ -25,7 +25,7 @@ project:
   updateTime: 2025-04-24 00:00:00
 
 workflows:
-  - name: workflow_with_one_sub_workflow_running
+  - name: workflow_with_one_sub_workflows
     code: 1
     version: 1
     projectCode: 1
@@ -35,7 +35,7 @@ workflows:
     updateTime: 2025-04-24 00:00:00
     userId: 1
     executionType: PARALLEL
-  - name: sub_workflow_running
+  - name: sub_workflow
     code: 2
     version: 1
     projectCode: 1
@@ -47,7 +47,7 @@ workflows:
     executionType: PARALLEL
 
 tasks:
-  - name: sub_workflow_task
+  - name: sub_workflow_task_submitted
     code: 1
     version: 1
     projectCode: 1
@@ -58,8 +58,9 @@ tasks:
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
     taskExecuteType: BATCH
+    taskPriority: MEDIUM
   - name: fake_task_A
-    code: 2
+    code: 4
     version: 1
     projectCode: 1
     userId: 1
@@ -69,11 +70,12 @@ tasks:
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
     taskExecuteType: BATCH
+    taskPriority: MEDIUM
 
 
 workflowInstances:
   - id: 1
-    name: workflow_with_one_sub_workflow_running-20250424180000000
+    name: workflow_with_sub_workflow_running-20250424180000000
     workflowDefinitionCode: 1
     workflowDefinitionVersion: 1
     projectCode: 1
@@ -82,7 +84,7 @@ workflowInstances:
     startTime: 2025-04-24 18:00:00
     endTime: null
     runTimes: 1
-    host: 127.0.0.1:5678
+    host: 1.2.3.4:5678
     commandType: START_PROCESS
     commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
     taskDependType: TASK_POST
@@ -95,16 +97,16 @@ workflowInstances:
     varPool: '[]'
     dryRun: 0
   - id: 2
-    name: sub_workflow_running-20250424180000000
+    name: sub_workflow_submitted-20250424180000000
     workflowDefinitionCode: 2
     workflowDefinitionVersion: 1
     projectCode: 1
-    state: RUNNING_EXECUTION
+    state: SUBMITTED_SUCCESS
     recovery: NO
     startTime: 2025-04-24 18:00:00
     endTime: null
     runTimes: 1
-    host: 127.0.0.1:5678
+    host: 5.6.7.8:5678
     commandType: START_PROCESS
     commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
     taskDependType: TASK_POST
@@ -120,7 +122,7 @@ workflowInstances:
 
 taskInstances:
   - id: 1
-    name: sub_workflow_task
+    name: sub_workflow_task_submitted
     taskType: SUB_WORKFLOW
     workflowInstanceId: 1
     workflowInstanceName: 
workflow_with_one_sub_workflow_running-20240816071251690
@@ -143,29 +145,7 @@ taskInstances:
     varPool: '[]'
     taskExecuteType: BATCH
     appLink: '{"subWorkflowInstanceId":2}'
-  - id: 2
-    name: fake_task_A
-    taskType: LogicFakeTask
-    workflowInstanceId: 2
-    workflowInstanceName: 
workflow_with_one_sub_workflow_running-20240816071251690
-    projectCode: 1
-    taskCode: 2
-    taskDefinitionVersion: 1
-    state: RUNNING_EXECUTION
-    firstSubmitTime: 2025-04-24 18:00:00
-    submitTime: 2025-04-24 18:00:00
-    startTime: 2025-04-24 18:00:00
-    retryTimes: 0
-    host: 127.0.0.1:1234
-    maxRetryTimes: 0
-    taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls ."}'
-    flag: YES
-    retryInterval: 0
-    delayTime: 0
-    workerGroup: default
-    executorId: 1
-    varPool: '[]'
-    taskExecuteType: BATCH
+    taskInstancePriority: MEDIUM
 
 taskRelations:
   - projectCode: 1
@@ -182,7 +162,7 @@ taskRelations:
     workflowDefinitionVersion: 1
     preTaskCode: 0
     preTaskVersion: 0
-    postTaskCode: 2
+    postTaskCode: 4
     postTaskVersion: 1
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
diff --git 
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
 
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
index 6d1e1e24ee..1f858d9991 100644
--- 
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
+++ 
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
@@ -58,6 +58,7 @@ tasks:
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
     taskExecuteType: BATCH
+    taskPriority: MEDIUM
   - name: fake_task_A
     code: 2
     version: 1
@@ -69,6 +70,7 @@ tasks:
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
     taskExecuteType: BATCH
+    taskPriority: MEDIUM
 
 
 workflowInstances:
@@ -143,6 +145,7 @@ taskInstances:
     varPool: '[]'
     taskExecuteType: BATCH
     appLink: '{"subWorkflowInstanceId":2}'
+    taskInstancePriority: MEDIUM
   - id: 2
     name: fake_task_A
     taskType: LogicFakeTask
@@ -166,6 +169,7 @@ taskInstances:
     executorId: 1
     varPool: '[]'
     taskExecuteType: BATCH
+    taskInstancePriority: MEDIUM
 
 taskRelations:
   - projectCode: 1
diff --git 
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
 
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running_in_diff_master.yaml
similarity index 93%
copy from 
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
copy to 
dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running_in_diff_master.yaml
index 6d1e1e24ee..06a59e5bdc 100644
--- 
a/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running.yaml
+++ 
b/dolphinscheduler-master/src/test/resources/it/failover/running_workflowInstance_with_sub_workflow_task_running_in_diff_master.yaml
@@ -58,6 +58,7 @@ tasks:
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
     taskExecuteType: BATCH
+    taskPriority: MEDIUM
   - name: fake_task_A
     code: 2
     version: 1
@@ -69,6 +70,7 @@ tasks:
     createTime: 2025-04-24 00:00:00
     updateTime: 2025-04-24 00:00:00
     taskExecuteType: BATCH
+    taskPriority: MEDIUM
 
 
 workflowInstances:
@@ -82,7 +84,7 @@ workflowInstances:
     startTime: 2025-04-24 18:00:00
     endTime: null
     runTimes: 1
-    host: 127.0.0.1:5678
+    host: 1.2.3.4:5678
     commandType: START_PROCESS
     commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
     taskDependType: TASK_POST
@@ -104,7 +106,7 @@ workflowInstances:
     startTime: 2025-04-24 18:00:00
     endTime: null
     runTimes: 1
-    host: 127.0.0.1:5678
+    host: 5.6.7.8:5678
     commandType: START_PROCESS
     commandParam: 
'{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
     taskDependType: TASK_POST
@@ -143,6 +145,7 @@ taskInstances:
     varPool: '[]'
     taskExecuteType: BATCH
     appLink: '{"subWorkflowInstanceId":2}'
+    taskInstancePriority: MEDIUM
   - id: 2
     name: fake_task_A
     taskType: LogicFakeTask
@@ -166,6 +169,7 @@ taskInstances:
     executorId: 1
     varPool: '[]'
     taskExecuteType: BATCH
+    taskInstancePriority: MEDIUM
 
 taskRelations:
   - projectCode: 1

Reply via email to