ruanwenjun commented on code in PR #17821:
URL:
https://github.com/apache/dolphinscheduler/pull/17821#discussion_r2679171904
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java:
##########
@@ -109,7 +110,14 @@ public void onDispatchEvent(final
IWorkflowExecutionRunnable workflowExecutionRu
taskInstance.getDelayTime(),
remainTimeMills);
}
- taskExecutionRunnable.initializeTaskExecutionContext();
+
+ try {
+ taskExecutionRunnable.initializeTaskExecutionContext();
+ } catch (Exception ex) {
+ log.error("Failed to initialize task execution context, taskName:
{}", taskInstance.getName(), ex);
Review Comment:
Do not catch all exception here, at least
`DataAccessResourceFailureException` should be filter here.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java:
##########
@@ -128,6 +131,19 @@ private void doFireSingleWorkflowEventBus(final
IWorkflowExecutionRunnable workf
ThreadUtils.sleep(5_000);
return;
}
+
+ // If task initializeTaskExecutionContext before dispatch is
failed
+ // construct and publish a dedicated TaskFatalLifecycleEvent
+ // so that the event will be handled by
TaskFatalLifecycleEventHandler
Review Comment:
```suggestion
```
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java:
##########
@@ -99,6 +100,38 @@ protected void releaseTaskInstanceResourcesIfNeeded(final
ITaskExecutionRunnable
}
}
+ @Override
+ public void onFatalEvent(final IWorkflowExecutionRunnable
workflowExecutionRunnable,
+ final ITaskExecutionRunnable
taskExecutionRunnable,
+ final TaskFatalLifecycleEvent taskFatalEvent) {
+ releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
+ persistentTaskInstanceFatalEventToDB(taskExecutionRunnable,
taskFatalEvent);
+
+ if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
+
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
+ return;
+ }
+
+ // If all successors are condition tasks, then the task will not be
marked as failure.
+ // And the DAG will continue to execute.
+ final IWorkflowExecutionGraph workflowExecutionGraph =
taskExecutionRunnable.getWorkflowExecutionGraph();
+ if
(workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable))
{
+ mergeTaskVarPoolToWorkflow(workflowExecutionRunnable,
taskExecutionRunnable);
+
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
+ return;
+ }
+
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
Review Comment:
Need to deal with the workflow failure strategy
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]