caishunfeng commented on code in PR #16288:
URL: 
https://github.com/apache/dolphinscheduler/pull/16288#discussion_r1671566282


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java:
##########
@@ -427,4 +427,9 @@ ProcessDefinition updateSingleProcessDefinition(User 
loginUser,
      * @return variables data
      */
     Map<String, Object> viewVariables(User loginUser, long projectCode, long 
code);
+
+    void saveProcessLineage(List<TaskDefinitionLog> taskDefinitionLogList,
+                            long projectCode,
+                            long processDefinitionCode,
+                            int processDefinitionVersion);

Review Comment:
   Keep the parameter range from large to small
   ```suggestion
       void saveProcessLineage(long projectCode,
                               long processDefinitionCode,  int 
processDefinitionVersion, 
                               List<TaskDefinitionLog> taskDefinitionLogList);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -239,13 +243,15 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     private DataSourceMapper dataSourceMapper;
 
     @Autowired
-    private WorkFlowLineageService workFlowLineageService;
+    private ProcessLineageService processLineageService;
 
     @Autowired
     private MetricsCleanUpService metricsCleanUpService;
 
     @Autowired
     private ListenerEventAlertManager listenerEventAlertManager;
+    @Autowired
+    private ProcessLineageMapper processLineageMapper;

Review Comment:
   It's better to use `ProcessLineageDao` in service;



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessLineageServiceImpl.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service.impl;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.ProcessLineageService;
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.dao.entity.DependentLineageTask;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
+import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessLineageMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+/**
+ * work flow lineage service impl
+ */
+@Slf4j
+@Service
+public class ProcessLineageServiceImpl extends BaseServiceImpl implements 
ProcessLineageService {
+
+    @Autowired
+    private ProjectMapper projectMapper;
+
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Autowired
+    private ProcessLineageMapper processLineageMapper;
+    @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Override
+    public List<WorkFlowRelationDetail> queryWorkFlowLineageByName(long 
projectCode, String processDefinitionName) {
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
+        }
+        return processLineageMapper.queryWorkFlowLineageByName(projectCode, 
processDefinitionName);
+    }
+
+    @Override
+    public Map<String, Object> queryWorkFlowLineageByCode(long projectCode, 
long processDefinitionCode) {
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            log.error("Project does not exist, projectCode:{}.", projectCode);
+            putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
+            return result;
+        }
+        List<ProcessLineage> upstreamProcessLineageList =
+                
processLineageMapper.queryByProcessDefinitionCode(processDefinitionCode);
+        List<ProcessLineage> downstreamProcessLineageList =
+                processLineageMapper.queryWorkFlowLineageByDept(projectCode, 
processDefinitionCode, 0);
+        List<ProcessLineage> totalProcessLineageList =
+                Stream.of(upstreamProcessLineageList, 
downstreamProcessLineageList)
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+
+        List<WorkFlowRelation> workFlowRelationList = 
getWorkFlowRelations(totalProcessLineageList);
+        List<WorkFlowRelationDetail> workFlowRelationDetailList =
+                getWorkflowRelationDetails(totalProcessLineageList.stream()
+                        .flatMap(pl -> {
+                            List<Long> processDefinitionCodes = new 
ArrayList<>();
+                            
processDefinitionCodes.add(pl.getProcessDefinitionCode());
+                            
processDefinitionCodes.add(pl.getDeptProcessDefinitionCode());
+                            return processDefinitionCodes.stream();
+                        }).distinct().collect(Collectors.toList()));
+
+        Map<String, Object> workFlowLists = new HashMap<>();
+        workFlowLists.put(Constants.WORKFLOW_RELATION_DETAIL_LIST, 
workFlowRelationDetailList);
+        workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, 
workFlowRelationList);
+        result.put(Constants.DATA_LIST, workFlowLists);
+        putMsg(result, Status.SUCCESS);

Review Comment:
   We should avoid using `Result` in service, it's unfriendly to other service 
calls.
   
   Same in other places.



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -407,11 +413,65 @@ protected Map<String, Object> createDagDefine(User 
loginUser,
                     processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
         }
 
+        saveProcessLineage(taskDefinitionLogs, 
processDefinition.getProjectCode(), processDefinition.getCode(),
+                insertVersion);
+
         putMsg(result, Status.SUCCESS);
         result.put(Constants.DATA_LIST, processDefinition);
         return result;
     }
 
+    @Override
+    public void saveProcessLineage(List<TaskDefinitionLog> 
taskDefinitionLogList,
+                                   long projectCode,
+                                   long processDefinitionCode,
+                                   int processDefinitionVersion) {
+        List<ProcessLineage> processLineageList =
+                generateProcessLineageList(taskDefinitionLogList, 
processDefinitionCode, processDefinitionVersion);
+        if (!processLineageList.isEmpty()) {
+            int insertProcessLineageResult = 
processLineageService.updateProcessLineage(processLineageList);
+            if (insertProcessLineageResult <= 0) {
+                log.error(
+                        "Save process lineage error, projectCode: {}, 
processDefinitionCode: {}, processDefinitionVersion: {}",
+                        projectCode, processDefinitionCode, 
processDefinitionVersion);
+                throw new 
ServiceException(Status.CREATE_PROCESS_LINEAGE_ERROR);
+            } else {
+                log.info(
+                        "Save process lineage complete, projectCode: {}, 
processDefinitionCode: {}, processDefinitionVersion: {}",
+                        projectCode, processDefinitionCode, 
processDefinitionVersion);
+            }
+        }

Review Comment:
   ```suggestion
           if (processLineageList.isEmpty()) {
                return;
           }
           int insertProcessLineageResult = 
processLineageService.updateProcessLineage(processLineageList);
           if (insertProcessLineageResult <= 0) {
               log.error(
                       "Save process lineage error, projectCode: {}, 
processDefinitionCode: {}, processDefinitionVersion: {}",
                       projectCode, processDefinitionCode, 
processDefinitionVersion);
               throw new ServiceException(Status.CREATE_PROCESS_LINEAGE_ERROR); 
           
           } 
           log.info(
                           "Save process lineage complete, projectCode: {}, 
processDefinitionCode: {}, processDefinitionVersion: {}",
                           projectCode, processDefinitionCode, 
processDefinitionVersion);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -407,11 +413,65 @@ protected Map<String, Object> createDagDefine(User 
loginUser,
                     processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
         }
 
+        saveProcessLineage(taskDefinitionLogs, 
processDefinition.getProjectCode(), processDefinition.getCode(),
+                insertVersion);
+
         putMsg(result, Status.SUCCESS);
         result.put(Constants.DATA_LIST, processDefinition);
         return result;
     }
 
+    @Override
+    public void saveProcessLineage(List<TaskDefinitionLog> 
taskDefinitionLogList,
+                                   long projectCode,
+                                   long processDefinitionCode,
+                                   int processDefinitionVersion) {
+        List<ProcessLineage> processLineageList =
+                generateProcessLineageList(taskDefinitionLogList, 
processDefinitionCode, processDefinitionVersion);
+        if (!processLineageList.isEmpty()) {
+            int insertProcessLineageResult = 
processLineageService.updateProcessLineage(processLineageList);
+            if (insertProcessLineageResult <= 0) {
+                log.error(
+                        "Save process lineage error, projectCode: {}, 
processDefinitionCode: {}, processDefinitionVersion: {}",
+                        projectCode, processDefinitionCode, 
processDefinitionVersion);
+                throw new 
ServiceException(Status.CREATE_PROCESS_LINEAGE_ERROR);
+            } else {
+                log.info(
+                        "Save process lineage complete, projectCode: {}, 
processDefinitionCode: {}, processDefinitionVersion: {}",
+                        projectCode, processDefinitionCode, 
processDefinitionVersion);
+            }
+        }
+
+    }
+
+    private List<ProcessLineage> 
generateProcessLineageList(List<TaskDefinitionLog> taskDefinitionLogList,
+                                                            long 
processDefinitionCode,
+                                                            int 
processDefinitionVersion) {
+        List<ProcessLineage> processLineageList = new ArrayList<>();
+        for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) {
+
+            if 
(TaskTypeUtils.isDependentTask(taskDefinitionLog.getTaskType())) {

Review Comment:
   ```suggestion
               if 
(!TaskTypeUtils.isDependentTask(taskDefinitionLog.getTaskType())) {
                    continue;
               }
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessLineageServiceImpl.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service.impl;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.ProcessLineageService;
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.dao.entity.DependentLineageTask;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
+import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessLineageMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+/**
+ * work flow lineage service impl
+ */
+@Slf4j
+@Service
+public class ProcessLineageServiceImpl extends BaseServiceImpl implements 
ProcessLineageService {
+
+    @Autowired
+    private ProjectMapper projectMapper;
+
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Autowired
+    private ProcessLineageMapper processLineageMapper;
+    @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Override
+    public List<WorkFlowRelationDetail> queryWorkFlowLineageByName(long 
projectCode, String processDefinitionName) {
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
+        }
+        return processLineageMapper.queryWorkFlowLineageByName(projectCode, 
processDefinitionName);
+    }
+
+    @Override
+    public Map<String, Object> queryWorkFlowLineageByCode(long projectCode, 
long processDefinitionCode) {
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            log.error("Project does not exist, projectCode:{}.", projectCode);
+            putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
+            return result;
+        }
+        List<ProcessLineage> upstreamProcessLineageList =
+                
processLineageMapper.queryByProcessDefinitionCode(processDefinitionCode);
+        List<ProcessLineage> downstreamProcessLineageList =
+                processLineageMapper.queryWorkFlowLineageByDept(projectCode, 
processDefinitionCode, 0);
+        List<ProcessLineage> totalProcessLineageList =
+                Stream.of(upstreamProcessLineageList, 
downstreamProcessLineageList)
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+
+        List<WorkFlowRelation> workFlowRelationList = 
getWorkFlowRelations(totalProcessLineageList);
+        List<WorkFlowRelationDetail> workFlowRelationDetailList =
+                getWorkflowRelationDetails(totalProcessLineageList.stream()
+                        .flatMap(pl -> {
+                            List<Long> processDefinitionCodes = new 
ArrayList<>();
+                            
processDefinitionCodes.add(pl.getProcessDefinitionCode());
+                            
processDefinitionCodes.add(pl.getDeptProcessDefinitionCode());
+                            return processDefinitionCodes.stream();
+                        }).distinct().collect(Collectors.toList()));
+
+        Map<String, Object> workFlowLists = new HashMap<>();
+        workFlowLists.put(Constants.WORKFLOW_RELATION_DETAIL_LIST, 
workFlowRelationDetailList);
+        workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, 
workFlowRelationList);
+        result.put(Constants.DATA_LIST, workFlowLists);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    @Override
+    public Map<String, Object> queryWorkFlowLineage(long projectCode) {
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            log.error("Project does not exist, projectCode:{}.", projectCode);
+            putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
+            return result;
+        }
+        List<ProcessLineage> processLineageList = 
processLineageMapper.queryByProjectCode(projectCode);
+        List<WorkFlowRelation> workFlowRelationList = 
getWorkFlowRelations(processLineageList);
+        List<WorkFlowRelationDetail> workFlowRelationDetailList = 
getWorkflowRelationDetails(processLineageList.stream()
+                .flatMap(pl -> {
+                    List<Long> processDefinitionCodes = new ArrayList<>();
+                    processDefinitionCodes.add(pl.getProcessDefinitionCode());
+                    
processDefinitionCodes.add(pl.getDeptProcessDefinitionCode());
+                    return processDefinitionCodes.stream();
+                }).distinct().collect(Collectors.toList()));
+
+        Map<String, Object> workFlowLists = new HashMap<>();
+        workFlowLists.put(Constants.WORKFLOW_RELATION_DETAIL_LIST, 
workFlowRelationDetailList);
+        workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, 
workFlowRelationList);
+        result.put(Constants.DATA_LIST, workFlowLists);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    private List<WorkFlowRelation> getWorkFlowRelations(List<ProcessLineage> 
processLineageList) {
+        List<WorkFlowRelation> workFlowRelations = new ArrayList<>();
+        List<Long> processDefinitionCodes = processLineageList.stream()
+                
.map(ProcessLineage::getProcessDefinitionCode).distinct().collect(Collectors.toList());
+        for (ProcessLineage processLineage : processLineageList) {
+            workFlowRelations.add(new 
WorkFlowRelation(processLineage.getDeptProcessDefinitionCode(),
+                    processLineage.getProcessDefinitionCode()));
+
+            if 
(!processDefinitionCodes.contains(processLineage.getDeptProcessDefinitionCode()))
 {
+                workFlowRelations.add(new WorkFlowRelation(0, 
processLineage.getProcessDefinitionCode()));
+            }
+        }
+        return workFlowRelations;
+    }
+
+    private List<WorkFlowRelationDetail> getWorkflowRelationDetails(List<Long> 
processDefinitionCodes) {
+        List<WorkFlowRelationDetail> workFlowRelationDetails = new 
ArrayList<>();
+        for (Long processDefinitionCode : processDefinitionCodes) {
+            List<WorkFlowRelationDetail> workFlowRelationDetailList =
+                    
processLineageMapper.queryWorkFlowLineageByCode(processDefinitionCode);
+            workFlowRelationDetails.addAll(workFlowRelationDetailList);
+        }
+        return workFlowRelationDetails;
+    }
+
+    /**
+     * Query tasks depend on process definition, include upstream or downstream
+     * and return tasks dependence with string format.
+     *
+     * @param projectCode           Project code want to query tasks dependence
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @param taskCode              Task code want to query tasks dependence
+     * @return Optional of formatter message
+     */
+    @Override
+    public Optional<String> taskDependentMsg(long projectCode, long 
processDefinitionCode, long taskCode) {
+        long queryTaskCode = 0;
+        if (taskCode != 0) {
+            queryTaskCode = taskCode;
+        }
+        List<ProcessLineage> dependentProcessList =
+                processLineageMapper.queryWorkFlowLineageByDept(projectCode, 
processDefinitionCode, queryTaskCode);
+        if (CollectionUtils.isEmpty(dependentProcessList)) {
+            return Optional.empty();
+        }
+
+        List<String> taskDepStrList = new ArrayList<>();
+
+        for (ProcessLineage processLineage : dependentProcessList) {
+            ProcessDefinition processDefinition =
+                    
processDefinitionMapper.queryByCode(processLineage.getDeptProcessDefinitionCode());
+            String taskName = "";
+            if (processLineage.getTaskDefinitionCode() != 0) {
+                TaskDefinition taskDefinition =
+                        
taskDefinitionMapper.queryByCode(processLineage.getTaskDefinitionCode());
+                taskName = taskDefinition.getName();
+            }
+            taskDepStrList.add(String.format(Constants.FORMAT_S_S_COLON, 
processDefinition.getName(), taskName));
+        }
+
+        String taskDepStr = String.join(Constants.COMMA, taskDepStrList);
+        if (taskCode != 0) {
+            TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByCode(taskCode);
+            return Optional
+                    
.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), 
taskDefinition.getName(),
+                            taskDepStr));
+        } else {
+            return 
Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), 
"",
+                    taskDepStr));
+        }
+    }
+
+    /**
+     * Query downstream tasks depend on a process definition or a task
+     *
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @return downstream dependent process definition list
+     */
+    @Override
+    public List<DependentProcessDefinition> 
queryDownstreamDependentProcessDefinitions(Long processDefinitionCode) {
+        List<DependentProcessDefinition> dependentProcessDefinitionList = new 
ArrayList<>();
+        List<ProcessLineage> processLineageList =
+                processLineageMapper.queryWorkFlowLineageByDept(0, 
processDefinitionCode, 0);

Review Comment:
   Don't use magic number 0, should add constant.



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -828,7 +888,7 @@ private void taskUsedInOtherTaskValid(ProcessDefinition 
processDefinition,
             boolean oldTaskExists = taskRelationList.stream()
                     .anyMatch(relation -> 
oldProcessTaskRelation.getPostTaskCode() == relation.getPostTaskCode());
             if (!oldTaskExists) {
-                Optional<String> taskDepMsg = 
workFlowLineageService.taskDepOnTaskMsg(
+                Optional<String> taskDepMsg = 
processLineageService.taskDependentMsg(

Review Comment:
   It seems don't need this method `taskDependentMsg` in 
`ProcessLineageService`, it's not a generic query.



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java:
##########
@@ -2219,6 +2280,30 @@ public Map<String, Object> 
switchProcessDefinitionVersion(User loginUser, long p
             putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
             throw new 
ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
         }
+
+        List<ProcessTaskRelation> processTaskRelationList = 
processTaskRelationMapper
+                
.queryProcessTaskRelationsByProcessDefinitionCode(processDefinitionLog.getCode(),
+                        processDefinitionLog.getVersion());
+        List<TaskDefinition> taskDefinitionList = new ArrayList<>();
+        for (ProcessTaskRelation processTaskRelation : 
processTaskRelationList) {
+            if (processTaskRelation.getPreTaskCode() != 0) {
+                TaskDefinition taskDefinition = new TaskDefinition();
+                taskDefinition.setCode(processTaskRelation.getPreTaskCode());
+                
taskDefinition.setVersion(processTaskRelation.getPreTaskVersion());
+                taskDefinitionList.add(taskDefinition);
+            }
+            if (processTaskRelation.getPostTaskCode() != 0) {
+                TaskDefinition taskDefinition = new TaskDefinition();
+                taskDefinition.setCode(processTaskRelation.getPostTaskCode());
+                
taskDefinition.setVersion(processTaskRelation.getPostTaskVersion());
+                taskDefinitionList.add(taskDefinition);

Review Comment:
   It's better to use a dto like `TaskCodeVersionDto`, because the other fileds 
is null if use `TaskDefinition` entity, and it's easy to cause NPE.



##########
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessLineageMapper.xml:
##########
@@ -0,0 +1,137 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"; >
+<mapper 
namespace="org.apache.dolphinscheduler.dao.mapper.ProcessLineageMapper">
+    <sql id="baseSql">
+        id
+        , process_definition_code
+        , process_definition_version
+        , task_definition_code
+        , task_definition_version
+        , dept_project_code
+        , dept_process_definition_code
+        , dept_task_definition_code
+        , create_time
+        , update_time
+    </sql>
+
+    <delete id="batchDeleteByProcessDefinitionCode">
+        delete from t_ds_process_lineage
+        where process_definition_code in
+        <foreach collection="processDefinitionCodes" index="index" item="i" 
open="(" separator="," close=")">
+            #{i}
+        </foreach>
+    </delete>
+
+    <insert id="batchInsert">
+        insert into t_ds_process_lineage (process_definition_code, 
process_definition_version, task_definition_code,
+        task_definition_version, dept_project_code, 
dept_process_definition_code, dept_task_definition_code)
+        values
+        <foreach collection="processLineages" item="processLineage" 
separator=",">
+            
(#{processLineage.processDefinitionCode},#{processLineage.processDefinitionVersion},
+            
#{processLineage.taskDefinitionCode},#{processLineage.taskDefinitionVersion},
+            
#{processLineage.deptProjectCode},#{processLineage.deptProcessDefinitionCode},
+            #{processLineage.deptTaskDefinitionCode})
+        </foreach>
+    </insert>
+
+    <select id="queryByProjectCode" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+        select
+            <include refid="baseSql"/>
+        from
+        t_ds_process_lineage
+        where process_definition_code in (select code from 
t_ds_process_definition where project_code = #{projectCode})
+    </select>
+
+    <select id="queryWorkFlowLineageByCode" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail">
+        select
+            tepd.code as work_flow_code
+            ,tepd.name as work_flow_name
+            ,tepd.release_state as work_flow_publish_status
+            ,tes.start_time as schedule_start_time
+            ,tes.end_time as schedule_end_time
+            ,tes.crontab as crontab
+            ,tes.release_state as schedule_publish_status
+        from t_ds_process_definition tepd
+        left join t_ds_schedules tes on tepd.code = tes.process_definition_code
+        where tepd.code = #{processDefinitionCode}
+    </select>
+
+    <select id="queryWorkFlowLineageByName" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail">
+        select
+        tepd.code as work_flow_code
+        ,tepd.name as work_flow_name
+        ,tepd.release_state as work_flow_publish_status
+        ,tes.start_time as schedule_start_time
+        ,tes.end_time as schedule_end_time
+        ,tes.crontab as crontab
+        ,tes.release_state as schedule_publish_status
+        from t_ds_process_definition tepd
+        left join t_ds_schedules tes on tepd.code = tes.process_definition_code
+        where 1=1
+        <if test="processDefinitionName != null and processDefinitionName != 
''">
+            and tepd.name = #{processDefinitionName}
+        </if>
+        and tepd.project_code = #{projectCode}
+    </select>
+
+    <select id="queryWorkFlowLineageByCode" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail">
+        select
+        tepd.code as work_flow_code
+        ,tepd.name as work_flow_name
+        ,tepd.release_state as work_flow_publish_status
+        ,tes.start_time as schedule_start_time
+        ,tes.end_time as schedule_end_time
+        ,tes.crontab as crontab
+        ,tes.release_state as schedule_publish_status
+        from t_ds_process_definition tepd

Review Comment:
   Uniform abbreviation specification, e.g t_ds_process_definition -> pd.
   ```suggestion
           from t_ds_process_definition pd
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessLineageServiceImpl.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service.impl;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.ProcessLineageService;
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.dao.entity.DependentLineageTask;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
+import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessLineageMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+/**
+ * work flow lineage service impl
+ */
+@Slf4j
+@Service
+public class ProcessLineageServiceImpl extends BaseServiceImpl implements 
ProcessLineageService {
+
+    @Autowired
+    private ProjectMapper projectMapper;
+
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Autowired
+    private ProcessLineageMapper processLineageMapper;
+    @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Override
+    public List<WorkFlowRelationDetail> queryWorkFlowLineageByName(long 
projectCode, String processDefinitionName) {
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
+        }
+        return processLineageMapper.queryWorkFlowLineageByName(projectCode, 
processDefinitionName);
+    }
+
+    @Override
+    public Map<String, Object> queryWorkFlowLineageByCode(long projectCode, 
long processDefinitionCode) {
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            log.error("Project does not exist, projectCode:{}.", projectCode);
+            putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
+            return result;
+        }
+        List<ProcessLineage> upstreamProcessLineageList =
+                
processLineageMapper.queryByProcessDefinitionCode(processDefinitionCode);
+        List<ProcessLineage> downstreamProcessLineageList =
+                processLineageMapper.queryWorkFlowLineageByDept(projectCode, 
processDefinitionCode, 0);
+        List<ProcessLineage> totalProcessLineageList =
+                Stream.of(upstreamProcessLineageList, 
downstreamProcessLineageList)
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+
+        List<WorkFlowRelation> workFlowRelationList = 
getWorkFlowRelations(totalProcessLineageList);
+        List<WorkFlowRelationDetail> workFlowRelationDetailList =
+                getWorkflowRelationDetails(totalProcessLineageList.stream()
+                        .flatMap(pl -> {
+                            List<Long> processDefinitionCodes = new 
ArrayList<>();
+                            
processDefinitionCodes.add(pl.getProcessDefinitionCode());
+                            
processDefinitionCodes.add(pl.getDeptProcessDefinitionCode());
+                            return processDefinitionCodes.stream();
+                        }).distinct().collect(Collectors.toList()));
+
+        Map<String, Object> workFlowLists = new HashMap<>();
+        workFlowLists.put(Constants.WORKFLOW_RELATION_DETAIL_LIST, 
workFlowRelationDetailList);
+        workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, 
workFlowRelationList);
+        result.put(Constants.DATA_LIST, workFlowLists);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    @Override
+    public Map<String, Object> queryWorkFlowLineage(long projectCode) {
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            log.error("Project does not exist, projectCode:{}.", projectCode);
+            putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
+            return result;
+        }
+        List<ProcessLineage> processLineageList = 
processLineageMapper.queryByProjectCode(projectCode);
+        List<WorkFlowRelation> workFlowRelationList = 
getWorkFlowRelations(processLineageList);
+        List<WorkFlowRelationDetail> workFlowRelationDetailList = 
getWorkflowRelationDetails(processLineageList.stream()
+                .flatMap(pl -> {
+                    List<Long> processDefinitionCodes = new ArrayList<>();
+                    processDefinitionCodes.add(pl.getProcessDefinitionCode());
+                    
processDefinitionCodes.add(pl.getDeptProcessDefinitionCode());
+                    return processDefinitionCodes.stream();
+                }).distinct().collect(Collectors.toList()));
+
+        Map<String, Object> workFlowLists = new HashMap<>();
+        workFlowLists.put(Constants.WORKFLOW_RELATION_DETAIL_LIST, 
workFlowRelationDetailList);
+        workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, 
workFlowRelationList);
+        result.put(Constants.DATA_LIST, workFlowLists);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    private List<WorkFlowRelation> getWorkFlowRelations(List<ProcessLineage> 
processLineageList) {
+        List<WorkFlowRelation> workFlowRelations = new ArrayList<>();
+        List<Long> processDefinitionCodes = processLineageList.stream()
+                
.map(ProcessLineage::getProcessDefinitionCode).distinct().collect(Collectors.toList());
+        for (ProcessLineage processLineage : processLineageList) {
+            workFlowRelations.add(new 
WorkFlowRelation(processLineage.getDeptProcessDefinitionCode(),
+                    processLineage.getProcessDefinitionCode()));
+
+            if 
(!processDefinitionCodes.contains(processLineage.getDeptProcessDefinitionCode()))
 {
+                workFlowRelations.add(new WorkFlowRelation(0, 
processLineage.getProcessDefinitionCode()));
+            }
+        }
+        return workFlowRelations;
+    }
+
+    private List<WorkFlowRelationDetail> getWorkflowRelationDetails(List<Long> 
processDefinitionCodes) {
+        List<WorkFlowRelationDetail> workFlowRelationDetails = new 
ArrayList<>();
+        for (Long processDefinitionCode : processDefinitionCodes) {
+            List<WorkFlowRelationDetail> workFlowRelationDetailList =
+                    
processLineageMapper.queryWorkFlowLineageByCode(processDefinitionCode);
+            workFlowRelationDetails.addAll(workFlowRelationDetailList);
+        }
+        return workFlowRelationDetails;
+    }
+
+    /**
+     * Query tasks depend on process definition, include upstream or downstream
+     * and return tasks dependence with string format.
+     *
+     * @param projectCode           Project code want to query tasks dependence
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @param taskCode              Task code want to query tasks dependence
+     * @return Optional of formatter message
+     */
+    @Override
+    public Optional<String> taskDependentMsg(long projectCode, long 
processDefinitionCode, long taskCode) {
+        long queryTaskCode = 0;
+        if (taskCode != 0) {
+            queryTaskCode = taskCode;
+        }
+        List<ProcessLineage> dependentProcessList =
+                processLineageMapper.queryWorkFlowLineageByDept(projectCode, 
processDefinitionCode, queryTaskCode);
+        if (CollectionUtils.isEmpty(dependentProcessList)) {
+            return Optional.empty();
+        }
+
+        List<String> taskDepStrList = new ArrayList<>();
+
+        for (ProcessLineage processLineage : dependentProcessList) {
+            ProcessDefinition processDefinition =
+                    
processDefinitionMapper.queryByCode(processLineage.getDeptProcessDefinitionCode());
+            String taskName = "";
+            if (processLineage.getTaskDefinitionCode() != 0) {
+                TaskDefinition taskDefinition =
+                        
taskDefinitionMapper.queryByCode(processLineage.getTaskDefinitionCode());
+                taskName = taskDefinition.getName();
+            }
+            taskDepStrList.add(String.format(Constants.FORMAT_S_S_COLON, 
processDefinition.getName(), taskName));
+        }
+
+        String taskDepStr = String.join(Constants.COMMA, taskDepStrList);
+        if (taskCode != 0) {
+            TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByCode(taskCode);
+            return Optional
+                    
.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), 
taskDefinition.getName(),
+                            taskDepStr));
+        } else {
+            return 
Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), 
"",
+                    taskDepStr));
+        }
+    }
+
+    /**
+     * Query downstream tasks depend on a process definition or a task
+     *
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @return downstream dependent process definition list
+     */
+    @Override
+    public List<DependentProcessDefinition> 
queryDownstreamDependentProcessDefinitions(Long processDefinitionCode) {
+        List<DependentProcessDefinition> dependentProcessDefinitionList = new 
ArrayList<>();
+        List<ProcessLineage> processLineageList =
+                processLineageMapper.queryWorkFlowLineageByDept(0, 
processDefinitionCode, 0);
+        if (processLineageList.isEmpty()) {
+            return dependentProcessDefinitionList;
+        }
+
+        List<ProcessDefinition> processDefinitionList = 
processDefinitionMapper.queryByCodes(processLineageList.stream()
+                
.map(ProcessLineage::getDeptProcessDefinitionCode).distinct().collect(Collectors.toList()));
+        List<TaskDefinition> taskDefinitionList = 
taskDefinitionMapper.queryByCodeList(processLineageList.stream()
+                
.map(ProcessLineage::getDeptTaskDefinitionCode).distinct().collect(Collectors.toList()));
+        for (TaskDefinition taskDefinition : taskDefinitionList) {
+            DependentProcessDefinition dependentProcessDefinition = new 
DependentProcessDefinition();
+            processLineageList.stream()
+                    .filter(processLineage -> 
processLineage.getDeptTaskDefinitionCode() == taskDefinition.getCode())
+                    .findFirst()
+                    .ifPresent(processLineage -> {
+                        dependentProcessDefinition
+                                
.setProcessDefinitionCode(processLineage.getDeptProcessDefinitionCode());
+                        
dependentProcessDefinition.setTaskDefinitionCode(taskDefinition.getCode());
+                        
dependentProcessDefinition.setTaskParams(taskDefinition.getTaskParams());
+                        
dependentProcessDefinition.setWorkerGroup(taskDefinition.getWorkerGroup());
+                    });
+            processDefinitionList.stream()
+                    .filter(processDefinition -> processDefinition.getCode() 
== dependentProcessDefinition
+                            .getProcessDefinitionCode())
+                    .findFirst()
+                    .ifPresent(processDefinition -> {
+                        
dependentProcessDefinition.setProcessDefinitionVersion(processDefinition.getVersion());
+                    });
+        }
+
+        return dependentProcessDefinitionList;
+    }
+
+    @Override
+    public Map<String, Object> queryDependentProcessDefinitions(long 
projectCode, long processDefinitionCode,
+                                                                Long taskCode) 
{
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            log.error("Project does not exist, projectCode:{}.", projectCode);
+            putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);

Review Comment:
   Throw ServiceException but not use `Result`.



##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java:
##########
@@ -17,99 +17,32 @@
 
 package org.apache.dolphinscheduler.dao.entity;
 
-/**
- * Process lineage
- */
-public class ProcessLineage {
-
-    /**
-     * project code
-     */
-    private long projectCode;
+import java.util.Date;
 
-    /**
-     * post task code
-     */
-    private long postTaskCode;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-    /**
-     * post task version
-     */
-    private int postTaskVersion;
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
 
-    /**
-     * pre task code
-     */
-    private long preTaskCode;
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("t_ds_process_lineage")

Review Comment:
   ```suggestion
   @TableName("t_ds_process_task_lineage")
   ```



##########
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+DROP TABLE IF EXISTS `t_ds_process_lineage`;
+CREATE TABLE `t_ds_process_lineage`

Review Comment:
   need to check table exist in ddl.



##########
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessLineageMapper.xml:
##########
@@ -0,0 +1,137 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"; >
+<mapper 
namespace="org.apache.dolphinscheduler.dao.mapper.ProcessLineageMapper">
+    <sql id="baseSql">
+        id
+        , process_definition_code
+        , process_definition_version
+        , task_definition_code
+        , task_definition_version
+        , dept_project_code
+        , dept_process_definition_code
+        , dept_task_definition_code
+        , create_time
+        , update_time
+    </sql>
+
+    <delete id="batchDeleteByProcessDefinitionCode">
+        delete from t_ds_process_lineage
+        where process_definition_code in
+        <foreach collection="processDefinitionCodes" index="index" item="i" 
open="(" separator="," close=")">
+            #{i}
+        </foreach>
+    </delete>
+
+    <insert id="batchInsert">
+        insert into t_ds_process_lineage (process_definition_code, 
process_definition_version, task_definition_code,
+        task_definition_version, dept_project_code, 
dept_process_definition_code, dept_task_definition_code)
+        values
+        <foreach collection="processLineages" item="processLineage" 
separator=",">
+            
(#{processLineage.processDefinitionCode},#{processLineage.processDefinitionVersion},
+            
#{processLineage.taskDefinitionCode},#{processLineage.taskDefinitionVersion},
+            
#{processLineage.deptProjectCode},#{processLineage.deptProcessDefinitionCode},
+            #{processLineage.deptTaskDefinitionCode})
+        </foreach>
+    </insert>
+
+    <select id="queryByProjectCode" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+        select
+            <include refid="baseSql"/>
+        from
+        t_ds_process_lineage
+        where process_definition_code in (select code from 
t_ds_process_definition where project_code = #{projectCode})
+    </select>
+
+    <select id="queryWorkFlowLineageByCode" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail">
+        select
+            tepd.code as work_flow_code
+            ,tepd.name as work_flow_name
+            ,tepd.release_state as work_flow_publish_status
+            ,tes.start_time as schedule_start_time
+            ,tes.end_time as schedule_end_time
+            ,tes.crontab as crontab
+            ,tes.release_state as schedule_publish_status
+        from t_ds_process_definition tepd
+        left join t_ds_schedules tes on tepd.code = tes.process_definition_code
+        where tepd.code = #{processDefinitionCode}
+    </select>
+
+    <select id="queryWorkFlowLineageByName" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail">
+        select
+        tepd.code as work_flow_code
+        ,tepd.name as work_flow_name
+        ,tepd.release_state as work_flow_publish_status
+        ,tes.start_time as schedule_start_time
+        ,tes.end_time as schedule_end_time
+        ,tes.crontab as crontab
+        ,tes.release_state as schedule_publish_status
+        from t_ds_process_definition tepd
+        left join t_ds_schedules tes on tepd.code = tes.process_definition_code
+        where 1=1
+        <if test="processDefinitionName != null and processDefinitionName != 
''">
+            and tepd.name = #{processDefinitionName}
+        </if>
+        and tepd.project_code = #{projectCode}
+    </select>
+
+    <select id="queryWorkFlowLineageByCode" 
resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail">
+        select
+        tepd.code as work_flow_code
+        ,tepd.name as work_flow_name
+        ,tepd.release_state as work_flow_publish_status
+        ,tes.start_time as schedule_start_time
+        ,tes.end_time as schedule_end_time
+        ,tes.crontab as crontab
+        ,tes.release_state as schedule_publish_status
+        from t_ds_process_definition tepd
+        left join t_ds_schedules tes on tepd.code = tes.process_definition_code
+        where 1=1
+        and tepd.code = #{processDefinitionCode}
+        and tepd.project_code = #{projectCode}
+    </select>
+
+    <select id="queryWorkFlowLineageByDept" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+        select
+            <include refid="baseSql"/>
+        from
+        t_ds_process_lineage
+        where 1=1
+        <if test="deptProjectCode != null and deptProjectCode != 0">
+        and dept_project_code = #{deptProjectCode}
+        </if>
+        and dept_process_definition_code = #{deptProcessDefinitionCode}
+        <if test="deptTaskDefinitionCode != null and deptTaskDefinitionCode != 
0">
+        and dept_task_definition_code = #{deptTaskDefinitionCode}
+        </if>
+    </select>
+
+    <select id="queryByProcessDefinitionCode" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
+        select
+            <include refid="baseSql"/>
+        from
+        t_ds_process_lineage
+        where process_definition_code = #{processDefinitionCode}
+    </select>
+
+    <update id="truncateTable">
+        truncate table t_ds_process_lineage
+    </update>

Review Comment:
   ```suggestion
   ```



##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessLineageMapper.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.mapper;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
+import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
+
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface ProcessLineageMapper extends BaseMapper<ProcessLineage> {
+
+    int batchDeleteByProcessDefinitionCode(@Param("processDefinitionCodes") 
List<Long> processDefinitionCodes);
+
+    int batchInsert(@Param("processLineages") List<ProcessLineage> 
processLineages);
+
+    List<ProcessLineage> queryByProjectCode(@Param("projectCode") long 
projectCode);
+
+    List<WorkFlowRelationDetail> 
queryWorkFlowLineageByCode(@Param("processDefinitionCode") long 
processDefinitionCode);
+
+    List<WorkFlowRelationDetail> 
queryWorkFlowLineageByName(@Param("projectCode") long projectCode,
+                                                            
@Param("processDefinitionName") String processDefinitionName);
+
+    List<ProcessLineage> queryWorkFlowLineageByDept(@Param("deptProjectCode") 
long deptProjectCode,
+                                                    
@Param("deptProcessDefinitionCode") long deptProcessDefinitionCode,
+                                                    
@Param("deptTaskDefinitionCode") long deptTaskDefinitionCode);
+
+    List<ProcessLineage> 
queryByProcessDefinitionCode(@Param("processDefinitionCode") long 
processDefinitionCode);
+
+    void truncateTable();

Review Comment:
   ```suggestion
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessLineageServiceImpl.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service.impl;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.ProcessLineageService;
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.dao.entity.DependentLineageTask;
+import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
+import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessLineageMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+/**
+ * work flow lineage service impl
+ */
+@Slf4j
+@Service
+public class ProcessLineageServiceImpl extends BaseServiceImpl implements 
ProcessLineageService {
+
+    @Autowired
+    private ProjectMapper projectMapper;
+
+    @Autowired
+    private TaskDefinitionLogMapper taskDefinitionLogMapper;
+
+    @Autowired
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Autowired
+    private ProcessLineageMapper processLineageMapper;
+    @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Override
+    public List<WorkFlowRelationDetail> queryWorkFlowLineageByName(long 
projectCode, String processDefinitionName) {
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
+        }
+        return processLineageMapper.queryWorkFlowLineageByName(projectCode, 
processDefinitionName);
+    }
+
+    @Override
+    public Map<String, Object> queryWorkFlowLineageByCode(long projectCode, 
long processDefinitionCode) {
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            log.error("Project does not exist, projectCode:{}.", projectCode);
+            putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
+            return result;
+        }
+        List<ProcessLineage> upstreamProcessLineageList =
+                
processLineageMapper.queryByProcessDefinitionCode(processDefinitionCode);
+        List<ProcessLineage> downstreamProcessLineageList =
+                processLineageMapper.queryWorkFlowLineageByDept(projectCode, 
processDefinitionCode, 0);
+        List<ProcessLineage> totalProcessLineageList =
+                Stream.of(upstreamProcessLineageList, 
downstreamProcessLineageList)
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+
+        List<WorkFlowRelation> workFlowRelationList = 
getWorkFlowRelations(totalProcessLineageList);
+        List<WorkFlowRelationDetail> workFlowRelationDetailList =
+                getWorkflowRelationDetails(totalProcessLineageList.stream()
+                        .flatMap(pl -> {
+                            List<Long> processDefinitionCodes = new 
ArrayList<>();
+                            
processDefinitionCodes.add(pl.getProcessDefinitionCode());
+                            
processDefinitionCodes.add(pl.getDeptProcessDefinitionCode());
+                            return processDefinitionCodes.stream();
+                        }).distinct().collect(Collectors.toList()));
+
+        Map<String, Object> workFlowLists = new HashMap<>();
+        workFlowLists.put(Constants.WORKFLOW_RELATION_DETAIL_LIST, 
workFlowRelationDetailList);
+        workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, 
workFlowRelationList);
+        result.put(Constants.DATA_LIST, workFlowLists);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    @Override
+    public Map<String, Object> queryWorkFlowLineage(long projectCode) {
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            log.error("Project does not exist, projectCode:{}.", projectCode);
+            putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
+            return result;
+        }
+        List<ProcessLineage> processLineageList = 
processLineageMapper.queryByProjectCode(projectCode);
+        List<WorkFlowRelation> workFlowRelationList = 
getWorkFlowRelations(processLineageList);
+        List<WorkFlowRelationDetail> workFlowRelationDetailList = 
getWorkflowRelationDetails(processLineageList.stream()
+                .flatMap(pl -> {
+                    List<Long> processDefinitionCodes = new ArrayList<>();
+                    processDefinitionCodes.add(pl.getProcessDefinitionCode());
+                    
processDefinitionCodes.add(pl.getDeptProcessDefinitionCode());
+                    return processDefinitionCodes.stream();
+                }).distinct().collect(Collectors.toList()));
+
+        Map<String, Object> workFlowLists = new HashMap<>();
+        workFlowLists.put(Constants.WORKFLOW_RELATION_DETAIL_LIST, 
workFlowRelationDetailList);
+        workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, 
workFlowRelationList);
+        result.put(Constants.DATA_LIST, workFlowLists);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    private List<WorkFlowRelation> getWorkFlowRelations(List<ProcessLineage> 
processLineageList) {
+        List<WorkFlowRelation> workFlowRelations = new ArrayList<>();
+        List<Long> processDefinitionCodes = processLineageList.stream()
+                
.map(ProcessLineage::getProcessDefinitionCode).distinct().collect(Collectors.toList());
+        for (ProcessLineage processLineage : processLineageList) {
+            workFlowRelations.add(new 
WorkFlowRelation(processLineage.getDeptProcessDefinitionCode(),
+                    processLineage.getProcessDefinitionCode()));
+
+            if 
(!processDefinitionCodes.contains(processLineage.getDeptProcessDefinitionCode()))
 {
+                workFlowRelations.add(new WorkFlowRelation(0, 
processLineage.getProcessDefinitionCode()));
+            }
+        }
+        return workFlowRelations;
+    }
+
+    private List<WorkFlowRelationDetail> getWorkflowRelationDetails(List<Long> 
processDefinitionCodes) {
+        List<WorkFlowRelationDetail> workFlowRelationDetails = new 
ArrayList<>();
+        for (Long processDefinitionCode : processDefinitionCodes) {
+            List<WorkFlowRelationDetail> workFlowRelationDetailList =
+                    
processLineageMapper.queryWorkFlowLineageByCode(processDefinitionCode);
+            workFlowRelationDetails.addAll(workFlowRelationDetailList);
+        }
+        return workFlowRelationDetails;
+    }
+
+    /**
+     * Query tasks depend on process definition, include upstream or downstream
+     * and return tasks dependence with string format.
+     *
+     * @param projectCode           Project code want to query tasks dependence
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @param taskCode              Task code want to query tasks dependence
+     * @return Optional of formatter message
+     */
+    @Override
+    public Optional<String> taskDependentMsg(long projectCode, long 
processDefinitionCode, long taskCode) {
+        long queryTaskCode = 0;
+        if (taskCode != 0) {
+            queryTaskCode = taskCode;
+        }
+        List<ProcessLineage> dependentProcessList =
+                processLineageMapper.queryWorkFlowLineageByDept(projectCode, 
processDefinitionCode, queryTaskCode);
+        if (CollectionUtils.isEmpty(dependentProcessList)) {
+            return Optional.empty();
+        }
+
+        List<String> taskDepStrList = new ArrayList<>();
+
+        for (ProcessLineage processLineage : dependentProcessList) {
+            ProcessDefinition processDefinition =
+                    
processDefinitionMapper.queryByCode(processLineage.getDeptProcessDefinitionCode());
+            String taskName = "";
+            if (processLineage.getTaskDefinitionCode() != 0) {
+                TaskDefinition taskDefinition =
+                        
taskDefinitionMapper.queryByCode(processLineage.getTaskDefinitionCode());
+                taskName = taskDefinition.getName();
+            }
+            taskDepStrList.add(String.format(Constants.FORMAT_S_S_COLON, 
processDefinition.getName(), taskName));
+        }
+
+        String taskDepStr = String.join(Constants.COMMA, taskDepStrList);
+        if (taskCode != 0) {
+            TaskDefinition taskDefinition = 
taskDefinitionMapper.queryByCode(taskCode);
+            return Optional
+                    
.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), 
taskDefinition.getName(),
+                            taskDepStr));
+        } else {
+            return 
Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), 
"",
+                    taskDepStr));
+        }
+    }
+
+    /**
+     * Query downstream tasks depend on a process definition or a task
+     *
+     * @param processDefinitionCode Process definition code want to query 
tasks dependence
+     * @return downstream dependent process definition list
+     */
+    @Override
+    public List<DependentProcessDefinition> 
queryDownstreamDependentProcessDefinitions(Long processDefinitionCode) {
+        List<DependentProcessDefinition> dependentProcessDefinitionList = new 
ArrayList<>();
+        List<ProcessLineage> processLineageList =
+                processLineageMapper.queryWorkFlowLineageByDept(0, 
processDefinitionCode, 0);
+        if (processLineageList.isEmpty()) {
+            return dependentProcessDefinitionList;
+        }
+
+        List<ProcessDefinition> processDefinitionList = 
processDefinitionMapper.queryByCodes(processLineageList.stream()
+                
.map(ProcessLineage::getDeptProcessDefinitionCode).distinct().collect(Collectors.toList()));
+        List<TaskDefinition> taskDefinitionList = 
taskDefinitionMapper.queryByCodeList(processLineageList.stream()
+                
.map(ProcessLineage::getDeptTaskDefinitionCode).distinct().collect(Collectors.toList()));
+        for (TaskDefinition taskDefinition : taskDefinitionList) {
+            DependentProcessDefinition dependentProcessDefinition = new 
DependentProcessDefinition();
+            processLineageList.stream()
+                    .filter(processLineage -> 
processLineage.getDeptTaskDefinitionCode() == taskDefinition.getCode())
+                    .findFirst()
+                    .ifPresent(processLineage -> {
+                        dependentProcessDefinition
+                                
.setProcessDefinitionCode(processLineage.getDeptProcessDefinitionCode());
+                        
dependentProcessDefinition.setTaskDefinitionCode(taskDefinition.getCode());
+                        
dependentProcessDefinition.setTaskParams(taskDefinition.getTaskParams());
+                        
dependentProcessDefinition.setWorkerGroup(taskDefinition.getWorkerGroup());
+                    });
+            processDefinitionList.stream()
+                    .filter(processDefinition -> processDefinition.getCode() 
== dependentProcessDefinition
+                            .getProcessDefinitionCode())
+                    .findFirst()
+                    .ifPresent(processDefinition -> {
+                        
dependentProcessDefinition.setProcessDefinitionVersion(processDefinition.getVersion());
+                    });
+        }
+
+        return dependentProcessDefinitionList;
+    }
+
+    @Override
+    public Map<String, Object> queryDependentProcessDefinitions(long 
projectCode, long processDefinitionCode,
+                                                                Long taskCode) 
{
+        Map<String, Object> result = new HashMap<>();
+        Project project = projectMapper.queryByCode(projectCode);
+        if (project == null) {
+            log.error("Project does not exist, projectCode:{}.", projectCode);
+            putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
+            return result;
+        }
+        List<ProcessLineage> processLineageList = 
processLineageMapper.queryWorkFlowLineageByDept(projectCode,
+                processDefinitionCode, taskCode == null ? 0 : taskCode);
+        List<ProcessDefinition> processDefinitionList = 
processDefinitionMapper.queryByCodes(processLineageList.stream()
+                
.map(ProcessLineage::getProcessDefinitionCode).distinct().collect(Collectors.toList()));
+        List<TaskDefinition> taskDefinitionList = 
taskDefinitionMapper.queryByCodeList(processLineageList.stream()
+                .map(ProcessLineage::getTaskDefinitionCode).filter(code -> 
code != 0).distinct()
+                .collect(Collectors.toList()));
+        List<DependentLineageTask> dependentLineageTaskList = new 
ArrayList<>();
+        for (ProcessLineage processLineage : processLineageList) {
+            DependentLineageTask dependentLineageTask = new 
DependentLineageTask();
+            taskDefinitionList.stream()
+                    .filter(taskDefinition -> taskDefinition.getCode() == 
processLineage.getTaskDefinitionCode())
+                    .findFirst()
+                    .ifPresent(taskDefinition -> {
+                        
dependentLineageTask.setTaskDefinitionCode(taskDefinition.getCode());
+                        
dependentLineageTask.setTaskDefinitionName(taskDefinition.getName());
+                    });
+            processDefinitionList.stream()
+                    .filter(processDefinition -> processDefinition.getCode() 
== processLineage
+                            .getProcessDefinitionCode())
+                    .findFirst()
+                    .ifPresent(processDefinition -> {
+                        
dependentLineageTask.setProcessDefinitionCode(processDefinition.getCode());
+                        
dependentLineageTask.setProcessDefinitionName(processDefinition.getName());
+                        
dependentLineageTask.setProjectCode(processDefinition.getProjectCode());
+                    });
+            dependentLineageTaskList.add(dependentLineageTask);
+        }
+        result.put(Constants.DATA_LIST, dependentLineageTaskList);
+        putMsg(result, Status.SUCCESS);
+        return result;
+    }
+
+    @Override
+    public int createProcessLineage(List<ProcessLineage> processLineages) {
+        return processLineageMapper.batchInsert(processLineages);
+    }
+
+    @Override
+    public int updateProcessLineage(List<ProcessLineage> processLineages) {
+        
processLineageMapper.batchDeleteByProcessDefinitionCode(processLineages.stream()
+                
.map(ProcessLineage::getProcessDefinitionCode).distinct().collect(Collectors.toList()));
+
+        return processLineageMapper.batchInsert(processLineages);
+    }

Review Comment:
   I think it's better to provide a upsert method in dao.



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java:
##########
@@ -427,4 +427,9 @@ ProcessDefinition updateSingleProcessDefinition(User 
loginUser,
      * @return variables data
      */
     Map<String, Object> viewVariables(User loginUser, long projectCode, long 
code);
+
+    void saveProcessLineage(List<TaskDefinitionLog> taskDefinitionLogList,
+                            long projectCode,
+                            long processDefinitionCode,
+                            int processDefinitionVersion);

Review Comment:
   Should change to `ProcessTaskLineage`? Because it not just keep the workflow 
relations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to