This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 7150b42a3c [Fix-17453] Fix TASK_ONLY strategy cannot work (#17461)
7150b42a3c is described below
commit 7150b42a3c388b8a19a21a305bf5b2e20d3f7bed
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Sep 15 18:34:06 2025 +0800
[Fix-17453] Fix TASK_ONLY strategy cannot work (#17461)
---
.../master/command/RunWorkflowCommandParam.java | 7 ++
.../handler/RecoverFailureTaskCommandHandler.java | 1 +
.../command/handler/RunWorkflowCommandHandler.java | 1 +
.../handler/WorkflowFailoverCommandHandler.java | 1 +
.../engine/graph/IWorkflowExecutionGraph.java | 5 ++
.../engine/graph/WorkflowExecutionGraph.java | 23 +++++++
.../master/integration/WorkflowOperator.java | 7 +-
.../integration/cases/WorkflowStartTestCase.java | 33 +++++++++
.../it/start/workflow_with_task_only_strategy.yaml | 80 ++++++++++++++++++++++
9 files changed, 157 insertions(+), 1 deletion(-)
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/command/RunWorkflowCommandParam.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/command/RunWorkflowCommandParam.java
index 960a4f7177..7ebc7347ff 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/command/RunWorkflowCommandParam.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/command/RunWorkflowCommandParam.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.extract.master.command;
import org.apache.dolphinscheduler.common.enums.CommandType;
+import java.util.List;
+
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@@ -35,4 +37,9 @@ public class RunWorkflowCommandParam extends
AbstractCommandParam {
return CommandType.START_PROCESS;
}
+ public RunWorkflowCommandParam withStartNodes(List<Long> startNodes) {
+ this.startNodes = startNodes;
+ return this;
+ }
+
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
index 7ef570eab8..92f5fb4ed0 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java
@@ -139,6 +139,7 @@ public class RecoverFailureTaskCommandHandler extends
AbstractCommandHandler {
.doVisitFunction(taskExecutionRunnableCreator)
.build();
workflowGraphTopologyLogicalVisitor.visit();
+ workflowExecutionGraph.removeUnReachableEdge();
workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
index 2e1bf83562..a127ae6cea 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java
@@ -116,6 +116,7 @@ public class RunWorkflowCommandHandler extends
AbstractCommandHandler {
.doVisitFunction(taskExecutionRunnableCreator)
.build();
workflowGraphTopologyLogicalVisitor.visit();
+ workflowExecutionGraph.removeUnReachableEdge();
workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
index 093d61e26e..8348375b6e 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java
@@ -137,6 +137,7 @@ public class WorkflowFailoverCommandHandler extends
AbstractCommandHandler {
.doVisitFunction(taskExecutionRunnableCreator)
.build();
workflowGraphTopologyLogicalVisitor.visit();
+ workflowExecutionGraph.removeUnReachableEdge();
workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
index fdbbb581da..f94641b4a3 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java
@@ -40,6 +40,11 @@ public interface IWorkflowExecutionGraph {
*/
void addEdge(final String fromTaskName, final Set<String> toTaskName);
+ /**
+ * Remove the unreachable edge in the graph.
+ */
+ void removeUnReachableEdge();
+
/**
* Return the start tasks, the start tasks in the workflow execution graph
is the tasks which predecessors is empty.
*/
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
index 0fa4d15cac..c21dd08c13 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java
@@ -28,13 +28,16 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
public class WorkflowExecutionGraph implements IWorkflowExecutionGraph {
+ // Store all the task execution runnable in the execution graph.
private final Map<String, ITaskExecutionRunnable>
totalTaskExecuteRunnableMap;
private final Set<String> failureTaskChains;
@@ -78,6 +81,26 @@ public class WorkflowExecutionGraph implements
IWorkflowExecutionGraph {
toTaskNames.forEach(toTask -> predecessors.computeIfAbsent(toTask, k
-> new HashSet<>()).add(fromTaskName));
}
+ @Override
+ public void removeUnReachableEdge() {
+ // If the node in successors or predecessors is not in
taskExecuteRunnableMap
+ // It means that the node is not executable, so we need to filter it
out
+ Consumer<Map<String, Set<String>>> removeUnReachableEdge = edgeMap -> {
+ final Iterator<Map.Entry<String, Set<String>>> iterator =
edgeMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Set<String>> entry = iterator.next();
+ if (!totalTaskExecuteRunnableMap.containsKey(entry.getKey())) {
+ iterator.remove();
+ continue;
+ }
+ Set<String> toTasks = entry.getValue();
+ toTasks.removeIf(toTask ->
!totalTaskExecuteRunnableMap.containsKey(toTask));
+ }
+ };
+ removeUnReachableEdge.accept(successors);
+ removeUnReachableEdge.accept(predecessors);
+ }
+
@Override
public List<ITaskExecutionRunnable> getStartNodes() {
return totalTaskExecuteRunnableMap.values()
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
index c133e29a0d..b3bf673b1f 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.integration;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -62,7 +63,8 @@ public class WorkflowOperator {
.workflowDefinitionVersion(workflowTriggerDTO.workflowDefinition.getVersion())
.startNodes(workflowTriggerDTO.getRunWorkflowCommandParam().getStartNodes())
.startParamList(workflowTriggerDTO.getRunWorkflowCommandParam().getCommandParams())
- .dryRun(workflowTriggerDTO.dryRun)
+ .dryRun(workflowTriggerDTO.getDryRun())
+ .taskDependType(workflowTriggerDTO.getTaskDependType())
.build();
final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
@@ -150,6 +152,9 @@ public class WorkflowOperator {
@Builder.Default
private Flag dryRun = Flag.NO;
+
+ @Builder.Default
+ private TaskDependType taskDependType = TaskDependType.TASK_POST;
}
@Data
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 530b6b7d6f..b0acc01449 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -21,6 +21,7 @@ import static com.google.common.truth.Truth.assertThat;
import static org.awaitility.Awaitility.await;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -1035,4 +1036,36 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
});
masterContainer.assertAllResourceReleased();
}
+
+ @Test
+ @DisplayName("Test start a workflow with task depend type TASK_ONLY")
+ public void testStartWorkflow_withTaskOnlyStrategy() {
+ final String yaml = "/it/start/workflow_with_task_only_strategy.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new
RunWorkflowCommandParam().withStartNodes(Lists.newArrayList(1L)))
+ .taskDependType(TaskDependType.TASK_ONLY)
+ .build();
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance ->
assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.SUCCESS));
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .satisfiesExactly(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
}
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_task_only_strategy.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_task_only_strategy.yaml
new file mode 100644
index 0000000000..1ff9e24173
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_task_only_strategy.yaml
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_two_serial_fake_task_success
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with two serial tasks
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo hello"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: B
+ code: 2
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo hello"}'
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2021-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 1
+ preTaskVersion: 1
+ postTaskCode: 2
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00