liunaijie commented on issue #15683:
URL: 
https://github.com/apache/dolphinscheduler/issues/15683#issuecomment-2713994425

   I find the root cause. It due to the status update issue. Related to 2 
classes
   
   
   
   #### WorkflowExecuteThreadPool
   
   When `sub_process` job finished (success or failed). it will call 
`notifyProcessChanged` method 
https://github.com/apache/dolphinscheduler/blob/3.2.0-release/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java#L146
   
   
   In this method, there are 2 issues:
   
   1. `crossWorkflowParameterPassing` method will get `NullPointException`. But 
this won't have any impact, just print error log
   
   ```
       private void crossWorkflowParameterPassing(ProcessInstance 
finishProcessInstance, TaskInstance taskInstance) {
           try {
             // when task finished, it will removed. so here get result will be 
null
               MasterTaskExecuteRunnable masterTaskExecuteRunnable =
                       
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstance.getId());
               masterTaskExecuteRunnable.getILogicTask().getTaskParameters()
                       .setVarPool(finishProcessInstance.getVarPool());
               log.info("Cross workflow parameter passing success, 
finishProcessInstanceId: {}, taskInstanceId: {}",
                       finishProcessInstance.getId(), taskInstance.getId());
           } catch (Exception ex) {
               log.error("Cross workflow parameter passing error, 
finishProcessInstanceId: {}, taskInstanceId: {}",
                       finishProcessInstance.getId(), taskInstance.getId(), ex);
           }
       }
   ```
   
   2. `notifyProcess` and `notifyMyself` method
   
   I am not sure why there have different process logical. I think there should 
only be `notify address` is different. but now there have different process 
logical.
   **_And that why when use standalone and 1 master mode, it can't be repeat._**
   
   
   when call `notifyProcess` method, it will generate 
`WorkflowStateEventChangeRequest` message to other master node.
   The generated message is 
   ```
   WorkflowStateEventChangeRequest workflowStateEventChangeRequest = new 
WorkflowStateEventChangeRequest(
                   finishProcessInstance.getId(), 0, 
finishProcessInstance.getState(), processInstance.getId(),
                   taskInstance.getId());
   ```
   Please notice `sourceTaskInstanceId` is set to 0, `destTaskInstanceId`  is 
set to `sub_process` task instance id.
   
   
   
   #### StateEventProcessor
   
   When send message to other master node, it will call 
`StateEventProcessor.process()` method
   
https://github.com/apache/dolphinscheduler/blob/3.2.0-release/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java#L51
   
   
   ```
       public void process(Channel channel, Message message) {
           WorkflowStateEventChangeRequest workflowStateEventChangeRequest =
                   JSONUtils.parseObject(message.getBody(), 
WorkflowStateEventChangeRequest.class);
           StateEvent stateEvent;
           if (workflowStateEventChangeRequest.getDestTaskInstanceId() == 0) {
               stateEvent = 
createWorkflowStateEvent(workflowStateEventChangeRequest);
           } else {
               stateEvent = 
createTaskStateEvent(workflowStateEventChangeRequest);
           }
   
           try (
                   final LogUtils.MDCAutoClosableContext mdcAutoClosableContext 
= LogUtils.setWorkflowAndTaskInstanceIDMDC(
                           stateEvent.getProcessInstanceId(), 
stateEvent.getTaskInstanceId())) {
               log.info("Received state change command, event: {}", stateEvent);
               stateEventResponseService.addStateChangeEvent(stateEvent);
           }
   
       }
   ```
   
   
   when receiving the message sent above ( sourceTaskInstanceId = 0, 
getDestTaskInstanceId = sub_process_task_instance_id  ). It will call 
`createTaskStateEvent` method. and send it to next step.
   
   
   ```
       private TaskStateEvent 
createTaskStateEvent(WorkflowStateEventChangeRequest 
workflowStateEventChangeRequest) {
           return TaskStateEvent.builder()
                   
.processInstanceId(workflowStateEventChangeRequest.getDestProcessInstanceId())
                   
.taskInstanceId(workflowStateEventChangeRequest.getDestTaskInstanceId())
                   .type(StateEventType.TASK_STATE_CHANGE)
                   .key(workflowStateEventChangeRequest.getKey())
                   .build();
       }
   ```
   
   In this method, will create a TaskStateEvent, this event is to describe 
`sub_process` task status. Event type is `StateEventType.TASK_STATE_CHANGE`, 
but not set `status` field.
   
   So when `StateEventResponseService` receive this event and process it.
   
   
   The code stack is 
   ```
   StateEventResponseService.addStateChangeEvent(stateEvent);
   
   
   WorkflowExecuteThreadPool.submitStateEvent(stateEvent)
   
   
   WorkflowExecuteThread.addStateEvent(stateEvent)
   
   
   WorkflowExecuteRunnable.handleEvents()
   ```
   
   
   Then it will go to `TaskStateEventHandler.handleStateEvent()` method
   
   
   in this class, it has `status` field check
   ```
           if (taskStateEvent == null || taskStateEvent.getStatus() == null) {
               // the event is broken
               log.warn("The task event is broken..., taskEvent: {}", 
taskStateEvent);
               return;
           }
   ```
   
   
   So this event will never process. and never remove from the `queue   ( 
WorkflowExecuteRunnable.stateEvents ) `
   
   
   #### How to fix
   
   From the above, we can see the problem is on this method.
   
   ```
       private TaskStateEvent 
createTaskStateEvent(WorkflowStateEventChangeRequest 
workflowStateEventChangeRequest) {
           return TaskStateEvent.builder()
                   
.processInstanceId(workflowStateEventChangeRequest.getDestProcessInstanceId())
                   
.taskInstanceId(workflowStateEventChangeRequest.getDestTaskInstanceId())
                   .type(StateEventType.TASK_STATE_CHANGE)
                   .key(workflowStateEventChangeRequest.getKey())
                   .build();
       }
   ```
   
   In my fix, I set the `status` by `workflow execution status` (maybe not set 
all cases, but most cases it can work well)
   
   ```
       private TaskStateEvent 
createTaskStateEvent(WorkflowStateEventChangeRequest 
workflowStateEventChangeRequest) {
           return TaskStateEvent.builder()
                   
.processInstanceId(workflowStateEventChangeRequest.getDestProcessInstanceId())
                   
.taskInstanceId(workflowStateEventChangeRequest.getDestTaskInstanceId())
                   .type(StateEventType.TASK_STATE_CHANGE)
                   
.status(workflowExecutionStatusToTaskExecutionStatus(workflowStateEventChangeRequest.getSourceStatus()))
                   .key(workflowStateEventChangeRequest.getKey())
                   .build();
       }
   
       private TaskExecutionStatus 
workflowExecutionStatusToTaskExecutionStatus(WorkflowExecutionStatus 
workflowExecutionStatus) {
           switch (workflowExecutionStatus) {
               case SUCCESS:
                   return TaskExecutionStatus.SUCCESS;
               case FAILURE:
                   return TaskExecutionStatus.FAILURE;
               case STOP:
                   return TaskExecutionStatus.STOP;
               case RUNNING_EXECUTION:
                   return TaskExecutionStatus.RUNNING_EXECUTION;
               default:
                   log.info("Received un-expected workflow execution status: 
{}", workflowExecutionStatus);
                   return TaskExecutionStatus.RUNNING_EXECUTION;
           }
       }
   
   ```
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to