liunaijie commented on issue #15683: URL: https://github.com/apache/dolphinscheduler/issues/15683#issuecomment-2713994425
I find the root cause. It due to the status update issue. Related to 2 classes #### WorkflowExecuteThreadPool When `sub_process` job finished (success or failed). it will call `notifyProcessChanged` method https://github.com/apache/dolphinscheduler/blob/3.2.0-release/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java#L146 In this method, there are 2 issues: 1. `crossWorkflowParameterPassing` method will get `NullPointException`. But this won't have any impact, just print error log ``` private void crossWorkflowParameterPassing(ProcessInstance finishProcessInstance, TaskInstance taskInstance) { try { // when task finished, it will removed. so here get result will be null MasterTaskExecuteRunnable masterTaskExecuteRunnable = MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstance.getId()); masterTaskExecuteRunnable.getILogicTask().getTaskParameters() .setVarPool(finishProcessInstance.getVarPool()); log.info("Cross workflow parameter passing success, finishProcessInstanceId: {}, taskInstanceId: {}", finishProcessInstance.getId(), taskInstance.getId()); } catch (Exception ex) { log.error("Cross workflow parameter passing error, finishProcessInstanceId: {}, taskInstanceId: {}", finishProcessInstance.getId(), taskInstance.getId(), ex); } } ``` 2. `notifyProcess` and `notifyMyself` method I am not sure why there have different process logical. I think there should only be `notify address` is different. but now there have different process logical. **_And that why when use standalone and 1 master mode, it can't be repeat._** when call `notifyProcess` method, it will generate `WorkflowStateEventChangeRequest` message to other master node. The generated message is ``` WorkflowStateEventChangeRequest workflowStateEventChangeRequest = new WorkflowStateEventChangeRequest( finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId()); ``` Please notice `sourceTaskInstanceId` is set to 0, `destTaskInstanceId` is set to `sub_process` task instance id. #### StateEventProcessor When send message to other master node, it will call `StateEventProcessor.process()` method https://github.com/apache/dolphinscheduler/blob/3.2.0-release/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java#L51 ``` public void process(Channel channel, Message message) { WorkflowStateEventChangeRequest workflowStateEventChangeRequest = JSONUtils.parseObject(message.getBody(), WorkflowStateEventChangeRequest.class); StateEvent stateEvent; if (workflowStateEventChangeRequest.getDestTaskInstanceId() == 0) { stateEvent = createWorkflowStateEvent(workflowStateEventChangeRequest); } else { stateEvent = createTaskStateEvent(workflowStateEventChangeRequest); } try ( final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId())) { log.info("Received state change command, event: {}", stateEvent); stateEventResponseService.addStateChangeEvent(stateEvent); } } ``` when receiving the message sent above ( sourceTaskInstanceId = 0, getDestTaskInstanceId = sub_process_task_instance_id ). It will call `createTaskStateEvent` method. and send it to next step. ``` private TaskStateEvent createTaskStateEvent(WorkflowStateEventChangeRequest workflowStateEventChangeRequest) { return TaskStateEvent.builder() .processInstanceId(workflowStateEventChangeRequest.getDestProcessInstanceId()) .taskInstanceId(workflowStateEventChangeRequest.getDestTaskInstanceId()) .type(StateEventType.TASK_STATE_CHANGE) .key(workflowStateEventChangeRequest.getKey()) .build(); } ``` In this method, will create a TaskStateEvent, this event is to describe `sub_process` task status. Event type is `StateEventType.TASK_STATE_CHANGE`, but not set `status` field. So when `StateEventResponseService` receive this event and process it. The code stack is ``` StateEventResponseService.addStateChangeEvent(stateEvent); WorkflowExecuteThreadPool.submitStateEvent(stateEvent) WorkflowExecuteThread.addStateEvent(stateEvent) WorkflowExecuteRunnable.handleEvents() ``` Then it will go to `TaskStateEventHandler.handleStateEvent()` method in this class, it has `status` field check ``` if (taskStateEvent == null || taskStateEvent.getStatus() == null) { // the event is broken log.warn("The task event is broken..., taskEvent: {}", taskStateEvent); return; } ``` So this event will never process. and never remove from the `queue ( WorkflowExecuteRunnable.stateEvents ) ` #### How to fix From the above, we can see the problem is on this method. ``` private TaskStateEvent createTaskStateEvent(WorkflowStateEventChangeRequest workflowStateEventChangeRequest) { return TaskStateEvent.builder() .processInstanceId(workflowStateEventChangeRequest.getDestProcessInstanceId()) .taskInstanceId(workflowStateEventChangeRequest.getDestTaskInstanceId()) .type(StateEventType.TASK_STATE_CHANGE) .key(workflowStateEventChangeRequest.getKey()) .build(); } ``` In my fix, I set the `status` by `workflow execution status` (maybe not set all cases, but most cases it can work well) ``` private TaskStateEvent createTaskStateEvent(WorkflowStateEventChangeRequest workflowStateEventChangeRequest) { return TaskStateEvent.builder() .processInstanceId(workflowStateEventChangeRequest.getDestProcessInstanceId()) .taskInstanceId(workflowStateEventChangeRequest.getDestTaskInstanceId()) .type(StateEventType.TASK_STATE_CHANGE) .status(workflowExecutionStatusToTaskExecutionStatus(workflowStateEventChangeRequest.getSourceStatus())) .key(workflowStateEventChangeRequest.getKey()) .build(); } private TaskExecutionStatus workflowExecutionStatusToTaskExecutionStatus(WorkflowExecutionStatus workflowExecutionStatus) { switch (workflowExecutionStatus) { case SUCCESS: return TaskExecutionStatus.SUCCESS; case FAILURE: return TaskExecutionStatus.FAILURE; case STOP: return TaskExecutionStatus.STOP; case RUNNING_EXECUTION: return TaskExecutionStatus.RUNNING_EXECUTION; default: log.info("Received un-expected workflow execution status: {}", workflowExecutionStatus); return TaskExecutionStatus.RUNNING_EXECUTION; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
