Copilot commented on code in PR #17955:
URL:
https://github.com/apache/dolphinscheduler/pull/17955#discussion_r2767884150
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java:
##########
@@ -81,36 +82,44 @@ protected void assembleWorkflowInstance(final
WorkflowExecuteContextBuilder work
}
@Override
- protected void assembleWorkflowExecutionGraph(final
WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
+ protected IWorkflowExecutionGraphAssembler
createWorkflowExecutionGraphAssembler(
+
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
+ // Capture the context needed for deferred graph assembly
final IWorkflowGraph workflowGraph =
workflowExecuteContextBuilder.getWorkflowGraph();
- final WorkflowExecutionGraph workflowExecutionGraph = new
WorkflowExecutionGraph();
- final BiConsumer<String, Set<String>> taskExecutionRunnableCreator =
(task, successors) -> {
- final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder =
- TaskExecutionRunnableBuilder
- .builder()
- .workflowExecutionGraph(workflowExecutionGraph)
-
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
-
.project(workflowExecuteContextBuilder.getProject())
-
.workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance())
-
.taskDefinition(workflowGraph.getTaskNodeByName(task))
-
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())
- .applicationContext(applicationContext)
+ final WorkflowDefinition workflowDefinition =
workflowExecuteContextBuilder.getWorkflowDefinition();
+ final WorkflowInstance workflowInstance =
workflowExecuteContextBuilder.getWorkflowInstance();
+ final List<String> startNodes =
parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder);
+
+ return () -> {
+ final WorkflowExecutionGraph workflowExecutionGraph = new
WorkflowExecutionGraph();
+ final BiConsumer<String, Set<String>> taskExecutionRunnableCreator
= (task, successors) -> {
+ final TaskExecutionRunnableBuilder
taskExecutionRunnableBuilder =
+ TaskExecutionRunnableBuilder
+ .builder()
+ .workflowExecutionGraph(workflowExecutionGraph)
+ .workflowDefinition(workflowDefinition)
+
.project(workflowExecuteContextBuilder.getProject())
+ .workflowInstance(workflowInstance)
+
.taskDefinition(workflowGraph.getTaskNodeByName(task))
+
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())
+ .applicationContext(applicationContext)
+ .build();
Review Comment:
The lambda returned by this method captures the
workflowExecuteContextBuilder reference. This is problematic because the lambda
may be invoked later (during WorkflowStartLifecycleEvent), and by that time the
builder object may have been modified or reused. Inside the lambda at lines
101-105, you're accessing methods like getProject(), getWorkflowEventBus() on
the builder. These values should be captured as final local variables before
creating the lambda (similar to how workflowDefinition, workflowInstance, and
startNodes are already captured) to ensure immutability and prevent potential
bugs from stale or modified builder state.
##########
deploy/kubernetes/dolphinscheduler/Chart.yaml:
##########
@@ -61,6 +61,6 @@ dependencies:
repository:
https://raw.githubusercontent.com/bitnami/charts/archive-full-index/bitnami
condition: mysql.enabled
- name: minio
- version: 11.10.13
+ version: 11.10.26
Review Comment:
The PR title and description indicate this is specifically about updating
the MinIO Helm chart version from 11.10.13 to 11.10.26. However, this PR
includes extensive Java code changes that are completely unrelated to the MinIO
Helm chart update. These changes involve refactoring workflow execution graph
assembly to use a deferred initialization pattern with
IWorkflowExecutionGraphAssembler. These Java changes should be in a separate
pull request as they are unrelated to the Helm chart version update and
represent a significant architectural change to the workflow engine.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java:
##########
@@ -49,12 +48,65 @@ public class WorkflowExecuteContext implements
IWorkflowExecuteContext {
private final IWorkflowGraph workflowGraph;
- private final IWorkflowExecutionGraph workflowExecutionGraph;
+ private volatile IWorkflowExecutionGraph workflowExecutionGraph;
+
Review Comment:
This method overrides
[IWorkflowExecuteContext.getWorkflowExecutionGraph](1); it is advisable to add
an Override annotation.
```suggestion
@Override
public IWorkflowExecutionGraph getWorkflowExecutionGraph() {
return workflowExecutionGraph;
}
```
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java:
##########
@@ -94,48 +96,51 @@ protected void assembleWorkflowInstance(
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
}
- /**
- * Generate the workflow execution graph.
- * <p> Will rebuild the WorkflowExecutionGraph from the exist task
instance.
- */
@Override
- protected void assembleWorkflowExecutionGraph(final
WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
- final Map<String, TaskInstance> taskInstanceMap =
-
getValidTaskInstance(workflowExecuteContextBuilder.getWorkflowInstance())
- .stream()
- .collect(Collectors.toMap(TaskInstance::getName,
Function.identity()));
-
+ protected IWorkflowExecutionGraphAssembler
createWorkflowExecutionGraphAssembler(
+
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
+ // Capture the context needed for deferred graph assembly
final IWorkflowGraph workflowGraph =
workflowExecuteContextBuilder.getWorkflowGraph();
- final WorkflowExecutionGraph workflowExecutionGraph = new
WorkflowExecutionGraph();
-
- final BiConsumer<String, Set<String>> taskExecutionRunnableCreator =
(task, successors) -> {
- final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder =
- TaskExecutionRunnableBuilder
- .builder()
- .workflowExecutionGraph(workflowExecutionGraph)
-
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
-
.project(workflowExecuteContextBuilder.getProject())
-
.workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance())
-
.taskDefinition(workflowGraph.getTaskNodeByName(task))
- .taskInstance(taskInstanceMap.get(task))
-
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())
- .applicationContext(applicationContext)
+ final WorkflowInstance workflowInstance =
workflowExecuteContextBuilder.getWorkflowInstance();
+ final List<String> startNodes =
parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder);
+
+ return () -> {
+ final Map<String, TaskInstance> taskInstanceMap =
+ getValidTaskInstance(workflowInstance)
+ .stream()
+ .collect(Collectors.toMap(TaskInstance::getName,
Function.identity()));
+
+ final WorkflowExecutionGraph workflowExecutionGraph = new
WorkflowExecutionGraph();
+
+ final BiConsumer<String, Set<String>> taskExecutionRunnableCreator
= (task, successors) -> {
+ final TaskExecutionRunnableBuilder
taskExecutionRunnableBuilder =
+ TaskExecutionRunnableBuilder
+ .builder()
+ .workflowExecutionGraph(workflowExecutionGraph)
+
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
+
.project(workflowExecuteContextBuilder.getProject())
+ .workflowInstance(workflowInstance)
+
.taskDefinition(workflowGraph.getTaskNodeByName(task))
+ .taskInstance(taskInstanceMap.get(task))
+
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())
+ .applicationContext(applicationContext)
+ .build();
+ workflowExecutionGraph.addNode(new
TaskExecutionRunnable(taskExecutionRunnableBuilder));
Review Comment:
The lambda returned by this method captures the
workflowExecuteContextBuilder reference. This is problematic because the lambda
may be invoked later (during WorkflowStartLifecycleEvent), and by that time the
builder object may have been modified or reused. Inside the lambda at lines
116-126, you're accessing methods like getWorkflowDefinition(), getProject(),
and getWorkflowEventBus() on the builder. These values should be captured as
final local variables before creating the lambda to ensure immutability and
prevent potential bugs from stale or modified builder state.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java:
##########
@@ -100,48 +101,51 @@ protected void assembleWorkflowInstance(
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
}
- /**
- * Generate the workflow execution graph.
- * <p> Will clear the history failure/killed task.
- * <p> If the task's predecessors exist failure/killed, will also mark the
task as failure/killed.
- */
@Override
- protected void assembleWorkflowExecutionGraph(final
WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
- final Map<String, TaskInstance> taskInstanceMap =
dealWithHistoryTaskInstances(workflowExecuteContextBuilder)
- .stream()
- .collect(Collectors.toMap(TaskInstance::getName,
Function.identity()));
-
+ protected IWorkflowExecutionGraphAssembler
createWorkflowExecutionGraphAssembler(
+
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
+ // Capture the context needed for deferred graph assembly
final IWorkflowGraph workflowGraph =
workflowExecuteContextBuilder.getWorkflowGraph();
- final WorkflowExecutionGraph workflowExecutionGraph = new
WorkflowExecutionGraph();
-
- final BiConsumer<String, Set<String>> taskExecutionRunnableCreator =
(task, successors) -> {
- final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder =
- TaskExecutionRunnableBuilder
- .builder()
- .workflowExecutionGraph(workflowExecutionGraph)
-
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
-
.project(workflowExecuteContextBuilder.getProject())
-
.workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance())
-
.taskDefinition(workflowGraph.getTaskNodeByName(task))
- .taskInstance(taskInstanceMap.get(task))
-
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())
- .applicationContext(applicationContext)
+ final WorkflowInstance workflowInstance =
workflowExecuteContextBuilder.getWorkflowInstance();
+ final List<String> startNodes =
parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder);
+
+ return () -> {
+ final Map<String, TaskInstance> taskInstanceMap =
dealWithHistoryTaskInstances(
+ workflowExecuteContextBuilder)
+ .stream()
+ .collect(Collectors.toMap(TaskInstance::getName,
Function.identity()));
+
+ final WorkflowExecutionGraph workflowExecutionGraph = new
WorkflowExecutionGraph();
+
+ final BiConsumer<String, Set<String>> taskExecutionRunnableCreator
= (task, successors) -> {
+ final TaskExecutionRunnableBuilder
taskExecutionRunnableBuilder =
+ TaskExecutionRunnableBuilder
+ .builder()
+ .workflowExecutionGraph(workflowExecutionGraph)
+
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
+
.project(workflowExecuteContextBuilder.getProject())
+ .workflowInstance(workflowInstance)
+
.taskDefinition(workflowGraph.getTaskNodeByName(task))
+ .taskInstance(taskInstanceMap.get(task))
+
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())
+ .applicationContext(applicationContext)
+ .build();
Review Comment:
The lambda returned by this method captures the
workflowExecuteContextBuilder reference. This is problematic because the lambda
may be invoked later (during WorkflowStartLifecycleEvent), and by that time the
builder object may have been modified or reused. Inside the lambda at lines
113-116 and lines 125-131, you're accessing methods like
getWorkflowDefinition(), getProject(), and getWorkflowEventBus() on the
builder. These values should be captured as final local variables before
creating the lambda to ensure immutability and prevent potential bugs from
stale or modified builder state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]