This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b9b6d51072 [Chore] Remove unused code in ProcessService (#17391)
b9b6d51072 is described below
commit b9b6d51072445e25b4f6372a1552c107c712ae9f
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Aug 6 00:23:43 2025 +0800
[Chore] Remove unused code in ProcessService (#17391)
---
.../service/impl/TaskDefinitionServiceImpl.java | 1 -
.../service/process/ProcessService.java | 33 --
.../service/process/ProcessServiceImpl.java | 620 ---------------------
.../service/process/ProcessServiceTest.java | 113 ----
4 files changed, 767 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 0dfa0ce719..449af5a38d 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -399,7 +399,6 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
taskDefinitionToUpdate.setUserId(taskDefinition.getUserId());
taskDefinitionToUpdate.setVersion(++version);
taskDefinitionToUpdate.setTaskType(taskDefinitionToUpdate.getTaskType().toUpperCase());
-
taskDefinitionToUpdate.setResourceIds(processService.getResourceIds(taskDefinitionToUpdate));
taskDefinitionToUpdate.setUpdateTime(now);
int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate);
taskDefinitionToUpdate.setOperator(loginUser.getId());
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index f070cf29e2..d45e6549e2 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -18,16 +18,12 @@
package org.apache.dolphinscheduler.service.process;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
-import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
-import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -35,46 +31,29 @@ import
org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelation;
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelationLog;
-import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.model.TaskNode;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
public interface ProcessService {
- WorkflowInstance constructWorkflowInstance(Command command,
- String host) throws
CronParseException;
-
Optional<WorkflowInstance> findWorkflowInstanceDetailById(int
workflowInstanceId);
WorkflowInstance findWorkflowInstanceById(int workflowInstanceId);
WorkflowDefinition findWorkflowDefinition(Long workflowDefinitionCode, int
workflowDefinitionVersion);
- WorkflowDefinition findWorkflowDefinitionByCode(Long
workflowDefinitionCode);
-
int deleteWorkflowInstanceById(int workflowInstanceId);
- int deleteAllSubWorkflowByParentId(int workflowInstanceId);
-
- void removeTaskLogFile(Integer workflowInstanceId);
-
List<Long> findAllSubWorkflowDefinitionCode(long workflowDefinitionCode);
String getTenantForWorkflow(String tenantCode, int userId);
- int deleteWorkflowMapByParentId(int parentWorkflowId);
-
WorkflowInstance findSubWorkflowInstance(Integer parentWorkflowInstanceId,
Integer parentTaskId);
WorkflowInstance findParentWorkflowInstance(Integer subWorkflowInstanceId);
- void changeOutParam(TaskInstance taskInstance);
-
- Schedule querySchedule(int id);
-
List<Schedule> queryReleaseSchedulerListByWorkflowDefinitionCode(long
workflowDefinitionCode);
DataSource findDataSourceById(int id);
@@ -83,16 +62,12 @@ public interface ProcessService {
User getUserById(int userId);
- String formatTaskAppId(TaskInstance taskInstance);
-
int switchVersion(WorkflowDefinition workflowDefinition,
WorkflowDefinitionLog workflowDefinitionLog);
int switchWorkflowTaskRelationVersion(WorkflowDefinition
workflowDefinition);
int switchTaskDefinitionVersion(long taskCode, int taskVersion);
- String getResourceIds(TaskDefinition taskDefinition);
-
int saveTaskDefine(User operator, long projectCode,
List<TaskDefinitionLog> taskDefinitionLogs, Boolean syncDefine);
int saveWorkflowDefine(User operator, WorkflowDefinition
workflowDefinition, Boolean syncDefine,
@@ -113,16 +88,8 @@ public interface ProcessService {
List<TaskNode> transformTask(List<WorkflowTaskRelation> taskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs);
- TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
- String taskName,
- Integer groupId,
- Integer workflowInstanceId,
- Integer priority,
- TaskGroupQueueStatus status);
-
String findConfigYamlByName(String clusterName);
void forceWorkflowInstanceSuccessByTaskInstanceId(TaskInstance
taskInstance);
- void setGlobalParamIfCommanded(WorkflowDefinition workflowDefinition,
Map<String, String> cmdParam);
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index a1fa3c51de..bf6805afa4 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -18,36 +18,24 @@
package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet;
-import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE;
-import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_WORKFLOW_DEFINITION_CODE;
-import static
org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
-import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
-import org.apache.dolphinscheduler.common.enums.TaskDependType;
-import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Cluster;
-import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
-import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
@@ -58,12 +46,10 @@ import
org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation;
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelation;
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelationLog;
import org.apache.dolphinscheduler.dao.mapper.ClusterMapper;
-import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
@@ -76,28 +62,14 @@ import
org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
-import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
-import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils;
-import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
-import org.apache.dolphinscheduler.extract.base.client.Clients;
-import org.apache.dolphinscheduler.extract.common.ILogService;
-import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.SubWorkflowParameters;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
-import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
-import org.apache.dolphinscheduler.service.command.CommandService;
-import org.apache.dolphinscheduler.service.cron.CronUtils;
-import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.utils.ClusterConfUtils;
import org.apache.dolphinscheduler.service.utils.DagHelper;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
@@ -115,15 +87,12 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
-
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -160,18 +129,12 @@ public class ProcessServiceImpl implements ProcessService
{
@Autowired
private TaskDefinitionLogDao taskDefinitionLogDao;
- @Autowired
- private WorkflowInstanceMapDao workflowInstanceMapDao;
-
@Autowired
private DataSourceMapper dataSourceMapper;
@Autowired
private WorkflowInstanceRelationMapper workflowInstanceRelationMapper;
- @Autowired
- private CommandMapper commandMapper;
-
@Autowired
private ScheduleMapper scheduleMapper;
@@ -190,18 +153,12 @@ public class ProcessServiceImpl implements ProcessService
{
@Autowired
private WorkflowTaskRelationLogMapper workflowTaskRelationLogMapper;
- @Autowired
- private TaskGroupQueueMapper taskGroupQueueMapper;
-
@Autowired
private ClusterMapper clusterMapper;
@Autowired
private CuringParamsService curingGlobalParamsService;
- @Autowired
- private CommandService commandService;
-
/**
* find workflow instance detail by id
*
@@ -243,17 +200,6 @@ public class ProcessServiceImpl implements ProcessService {
return workflowDefinition;
}
- /**
- * find workflow define by code.
- *
- * @param workflowDefinitionCode workflowDefinitionCode
- * @return workflow definition
- */
- @Override
- public WorkflowDefinition findWorkflowDefinitionByCode(Long
workflowDefinitionCode) {
- return workflowDefinitionMapper.queryByCode(workflowDefinitionCode);
- }
-
/**
* delete work workflow instance by id
*
@@ -265,49 +211,6 @@ public class ProcessServiceImpl implements ProcessService {
return workflowInstanceMapper.deleteById(workflowInstanceId);
}
- /**
- * delete all sub workflow by parent instance id
- *
- * @param workflowInstanceId workflowInstanceId
- * @return delete all sub workflow instance result
- */
- @Override
- public int deleteAllSubWorkflowByParentId(int workflowInstanceId) {
-
- List<Integer> subWorkflowIdList =
workflowInstanceRelationMapper.querySubIdListByParentId(workflowInstanceId);
-
- for (Integer subId : subWorkflowIdList) {
- deleteAllSubWorkflowByParentId(subId);
- deleteWorkflowMapByParentId(subId);
- removeTaskLogFile(subId);
- deleteWorkflowInstanceById(subId);
- }
- return 1;
- }
-
- /**
- * remove task log file
- *
- * @param workflowInstanceId workflowInstanceId
- */
- @Override
- public void removeTaskLogFile(Integer workflowInstanceId) {
- List<TaskInstance> taskInstanceList =
taskInstanceDao.queryByWorkflowInstanceId(workflowInstanceId);
- if (CollectionUtils.isEmpty(taskInstanceList)) {
- return;
- }
- for (TaskInstance taskInstance : taskInstanceList) {
- String taskLogPath = taskInstance.getLogPath();
- if (StringUtils.isEmpty(taskInstance.getHost()) ||
StringUtils.isEmpty(taskLogPath)) {
- continue;
- }
- Clients
- .withService(ILogService.class)
- .withHost(taskInstance.getHost())
- .removeTaskInstanceLog(taskLogPath);
- }
- }
-
/**
* recursive query sub workflow definition id by parent id.
*
@@ -334,108 +237,6 @@ public class ProcessServiceImpl implements ProcessService
{
return subWorkflowDefinitionCodes;
}
- /**
- * generate a new work workflow instance from command.
- *
- * @param workflowDefinition workflowDefinition
- * @param command command
- * @param cmdParam cmdParam map
- * @return workflow instance
- */
- private WorkflowInstance generateNewWorkflowInstance(WorkflowDefinition
workflowDefinition,
- Command command,
- Map<String, String>
cmdParam) {
- WorkflowInstance workflowInstance = new
WorkflowInstance(workflowDefinition);
-
workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode());
-
workflowInstance.setWorkflowDefinitionVersion(workflowDefinition.getVersion());
- workflowInstance.setProjectCode(workflowDefinition.getProjectCode());
-
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"init running");
- workflowInstance.setRecovery(Flag.NO);
- workflowInstance.setStartTime(new Date());
- // the new workflow instance restart time is null.
- workflowInstance.setRestartTime(null);
- workflowInstance.setRunTimes(1);
- workflowInstance.setMaxTryTimes(0);
- workflowInstance.setCommandParam(command.getCommandParam());
- workflowInstance.setCommandType(command.getCommandType());
- workflowInstance.setIsSubWorkflow(Flag.NO);
- workflowInstance.setTaskDependType(command.getTaskDependType());
- workflowInstance.setFailureStrategy(command.getFailureStrategy());
- workflowInstance.setExecutorId(command.getExecutorId());
-
workflowInstance.setExecutorName(Optional.ofNullable(userMapper.selectById(command.getExecutorId()))
- .map(User::getUserName).orElse(null));
- WarningType warningType = command.getWarningType() == null ?
WarningType.NONE : command.getWarningType();
- workflowInstance.setWarningType(warningType);
- Integer warningGroupId = command.getWarningGroupId() == null ? 0 :
command.getWarningGroupId();
- workflowInstance.setWarningGroupId(warningGroupId);
- workflowInstance.setDryRun(command.getDryRun());
-
- if (command.getScheduleTime() != null) {
- workflowInstance.setScheduleTime(command.getScheduleTime());
- }
- workflowInstance.setCommandStartTime(command.getStartTime());
- workflowInstance.setLocations(workflowDefinition.getLocations());
-
- // reset global params while there are start parameters
- setGlobalParamIfCommanded(workflowDefinition, cmdParam);
-
- // curing global params
- Map<String, String> commandParamMap =
JSONUtils.toMap(command.getCommandParam());
- String timezoneId = null;
- if (commandParamMap != null) {
- timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE);
- }
-
- String globalParams =
curingGlobalParamsService.curingGlobalParams(workflowInstance.getId(),
- workflowDefinition.getGlobalParamMap(),
- workflowDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(workflowInstance, command),
- workflowInstance.getScheduleTime(), timezoneId);
- workflowInstance.setGlobalParams(globalParams);
-
- // set workflow instance priority
-
workflowInstance.setWorkflowInstancePriority(command.getWorkflowInstancePriority());
-
workflowInstance.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(command.getWorkerGroup()));
-
workflowInstance.setEnvironmentCode(EnvironmentUtils.getEnvironmentCodeOrDefault(command.getEnvironmentCode()));
- workflowInstance.setTimeout(workflowDefinition.getTimeout());
- workflowInstance.setTenantCode(command.getTenantCode());
- return workflowInstance;
- }
-
- @Override
- public void setGlobalParamIfCommanded(WorkflowDefinition
workflowDefinition, Map<String, String> cmdParam) {
-
- // get start params from command param
- Map<String, Property> fatherParam =
curingGlobalParamsService.parseWorkflowFatherParam(cmdParam);
- Map<String, Property> startParamMap = new HashMap<>(fatherParam);
-
- Map<String, Property> currentStartParamMap =
curingGlobalParamsService.parseWorkflowStartParam(cmdParam);
- startParamMap.putAll(currentStartParamMap);
-
- // set start param into global params
- Map<String, String> globalMap = workflowDefinition.getGlobalParamMap();
- List<Property> globalParamList =
workflowDefinition.getGlobalParamList();
- if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) {
- // start param to overwrite global param
- for (Map.Entry<String, String> param : globalMap.entrySet()) {
- String globalKey = param.getKey();
- if (startParamMap.containsKey(globalKey)) {
- String val = startParamMap.get(globalKey).getValue();
- if (val != null) {
- param.setValue(val);
- }
- }
- }
- // start param to create new global param if global not exist
- for (Entry<String, Property> startParam :
startParamMap.entrySet()) {
- if (!globalMap.containsKey(startParam.getKey())) {
- globalMap.put(startParam.getKey(),
startParam.getValue().getValue());
- globalParamList.add(startParam.getValue());
- }
- }
- }
- }
-
/**
* Get workflow runtime tenant
* <p>
@@ -462,309 +263,6 @@ public class ProcessServiceImpl implements ProcessService
{
return tenant.getTenantCode();
}
- /**
- * check command parameters is valid
- *
- * @param command command
- * @param cmdParam cmdParam map
- * @return whether command param is valid
- */
- private Boolean checkCmdParam(Command command, Map<String, String>
cmdParam) {
- if (command.getTaskDependType() == TaskDependType.TASK_ONLY
- || command.getTaskDependType() == TaskDependType.TASK_PRE) {
- if (cmdParam == null
- ||
!cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_START_NODES)
- ||
cmdParam.get(CommandKeyConstants.CMD_PARAM_START_NODES).isEmpty()) {
- log.error("command node depend type is {}, but start nodes is
null ", command.getTaskDependType());
- return false;
- }
- }
- return true;
- }
-
- /**
- * construct workflow instance according to one command.
- *
- * @param command command
- * @param host host
- * @return workflow instance
- */
- @Override
- public @Nullable WorkflowInstance constructWorkflowInstance(Command
command,
- String host)
throws CronParseException {
- WorkflowInstance workflowInstance;
- WorkflowDefinition workflowDefinition;
- CommandType commandType = command.getCommandType();
-
- workflowDefinition =
-
this.findWorkflowDefinition(command.getWorkflowDefinitionCode(),
- command.getWorkflowDefinitionVersion());
- if (workflowDefinition == null) {
- log.error("cannot find the work workflow define! define code :
{}", command.getWorkflowDefinitionCode());
- throw new IllegalArgumentException("Cannot find the workflow
definition for this workflowInstance");
- }
- Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
- if (cmdParam == null) {
- cmdParam = new HashMap<>();
- }
- int workflowInstanceId = command.getWorkflowInstanceId();
- if (workflowInstanceId == 0) {
- workflowInstance = generateNewWorkflowInstance(workflowDefinition,
command, cmdParam);
- } else {
- workflowInstance =
this.findWorkflowInstanceDetailById(workflowInstanceId).orElse(null);
- setGlobalParamIfCommanded(workflowDefinition, cmdParam);
- if (workflowInstance == null) {
- return null;
- }
- }
-
- CommandType commandTypeIfComplement =
getCommandTypeIfComplement(workflowInstance, command);
- // reset global params while repeat running and recover tolerance
fault workflow is needed by cmdParam
- if (commandTypeIfComplement == CommandType.REPEAT_RUNNING ||
- commandTypeIfComplement ==
CommandType.RECOVER_TOLERANCE_FAULT_PROCESS ||
- commandTypeIfComplement == CommandType.RECOVER_SERIAL_WAIT) {
- setGlobalParamIfCommanded(workflowDefinition, cmdParam);
- }
-
- // time zone
- String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
-
- // Recalculate global parameters after rerun.
- String globalParams =
curingGlobalParamsService.curingGlobalParams(workflowInstance.getId(),
- workflowDefinition.getGlobalParamMap(),
- workflowDefinition.getGlobalParamList(),
- commandTypeIfComplement,
- workflowInstance.getScheduleTime(), timezoneId);
- workflowInstance.setGlobalParams(globalParams);
- workflowInstance.setWorkflowDefinition(workflowDefinition);
-
- // reset command parameter
- if (workflowInstance.getCommandParam() != null) {
- Map<String, String> workflowCmdParam =
JSONUtils.toMap(workflowInstance.getCommandParam());
- Map<String, String> finalCmdParam = cmdParam;
- workflowCmdParam.forEach((key, value) -> {
- if (!finalCmdParam.containsKey(key)) {
- finalCmdParam.put(key, value);
- }
- });
- }
- // reset command parameter if sub workflow
- if (cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_SUB_WORKFLOW)) {
- workflowInstance.setCommandParam(command.getCommandParam());
- }
- if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
- log.error("command parameter check failed!");
- return null;
- }
- if (command.getScheduleTime() != null) {
- workflowInstance.setScheduleTime(command.getScheduleTime());
- }
- workflowInstance.setHost(host);
- workflowInstance.setRestartTime(new Date());
- WorkflowExecutionStatus runStatus =
WorkflowExecutionStatus.RUNNING_EXECUTION;
- int runTime = workflowInstance.getRunTimes();
- switch (commandType) {
- case START_PROCESS:
- case DYNAMIC_GENERATION:
- break;
- case START_FAILURE_TASK_PROCESS:
- case RECOVER_SUSPENDED_PROCESS:
- List<TaskInstance> needToStartTaskInstances = taskInstanceDao
-
.queryValidTaskListByWorkflowInstanceId(workflowInstance.getId())
- .stream()
- .filter(taskInstance -> {
- TaskExecutionStatus state =
taskInstance.getState();
- return state == TaskExecutionStatus.FAILURE
- || state == TaskExecutionStatus.PAUSE
- || state ==
TaskExecutionStatus.NEED_FAULT_TOLERANCE
- || state == TaskExecutionStatus.KILL;
- })
- .collect(Collectors.toList());
-
- for (TaskInstance taskInstance : needToStartTaskInstances) {
- initTaskInstance(taskInstance);
- }
- String startTaskInstanceIds = needToStartTaskInstances.stream()
- .map(TaskInstance::getId)
- .map(String::valueOf)
- .collect(Collectors.joining(Constants.COMMA));
-
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
startTaskInstanceIds);
-
workflowInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
- workflowInstance.setRunTimes(runTime + 1);
- break;
- case START_CURRENT_TASK_PROCESS:
- break;
- case RECOVER_TOLERANCE_FAULT_PROCESS:
- // recover tolerance fault workflow
- // If the workflow instance is in ready state, we will change
to running, this can avoid the workflow
- // instance
- // status is not correct with taskInstance status
- if (workflowInstance.getState() ==
WorkflowExecutionStatus.READY_PAUSE
- || workflowInstance.getState() ==
WorkflowExecutionStatus.READY_STOP) {
- // todo: If we handle the ready state in
WorkflowExecuteRunnable then we can remove below code
-
workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
- }
- workflowInstance.setRecovery(Flag.YES);
- workflowInstance.setRunTimes(runTime + 1);
- runStatus = workflowInstance.getState();
- break;
- case COMPLEMENT_DATA:
- // delete all the valid tasks when complement data if id is
not null
- if (workflowInstance.getId() != null) {
- List<TaskInstance> taskInstanceList =
-
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstance.getId());
- for (TaskInstance taskInstance : taskInstanceList) {
- taskInstance.setFlag(Flag.NO);
- taskInstanceDao.updateById(taskInstance);
- }
- }
- break;
- case REPEAT_RUNNING:
- // delete the recover task names from command parameter
- if
(cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING))
{
-
cmdParam.remove(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING);
-
workflowInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
- }
- // delete the StartNodeList from command parameter if last
execution is only execute specified tasks
- if
(workflowInstance.getCommandType().equals(CommandType.EXECUTE_TASK)) {
- cmdParam.remove(CommandKeyConstants.CMD_PARAM_START_NODES);
-
workflowInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
-
workflowInstance.setTaskDependType(command.getTaskDependType());
- }
- // delete all the valid tasks when repeat running
- List<TaskInstance> validTaskList =
-
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstance.getId());
- for (TaskInstance taskInstance : validTaskList) {
- taskInstance.setFlag(Flag.NO);
- taskInstanceDao.updateById(taskInstance);
- }
- workflowInstance.setStartTime(new Date());
-
workflowInstance.setRestartTime(workflowInstance.getStartTime());
- workflowInstance.setEndTime(null);
- workflowInstance.setRunTimes(runTime + 1);
- initComplementDataParam(workflowDefinition, workflowInstance,
cmdParam);
- break;
- case SCHEDULER:
- break;
- case EXECUTE_TASK:
- workflowInstance.setRunTimes(runTime + 1);
-
workflowInstance.setTaskDependType(command.getTaskDependType());
-
workflowInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
- break;
- default:
- break;
- }
- workflowInstance.setStateWithDesc(runStatus, commandType.getDescp());
- return workflowInstance;
- }
-
- /**
- * return complement data if the workflow start with complement data
- *
- * @param workflowInstance workflowInstance
- * @param command command
- * @return command type
- */
- private CommandType getCommandTypeIfComplement(WorkflowInstance
workflowInstance, Command command) {
- if (CommandType.COMPLEMENT_DATA ==
workflowInstance.getCmdTypeIfComplement()) {
- return CommandType.COMPLEMENT_DATA;
- } else {
- return command.getCommandType();
- }
- }
-
- /**
- * initialize complement data parameters
- *
- * @param workflowDefinition workflowDefinition
- * @param workflowInstance workflowInstance
- * @param cmdParam cmdParam
- */
- private void initComplementDataParam(WorkflowDefinition workflowDefinition,
- WorkflowInstance workflowInstance,
- Map<String, String> cmdParam) throws
CronParseException {
- if (!workflowInstance.isComplementData()) {
- return;
- }
-
- Date start =
DateUtils.stringToDate(cmdParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE));
- Date end =
DateUtils.stringToDate(cmdParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE));
- List<Date> complementDate = Lists.newLinkedList();
- if (start != null && end != null) {
- List<Schedule> listSchedules =
-
queryReleaseSchedulerListByWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
- complementDate = CronUtils.getSelfFireDateList(start, end,
listSchedules);
- }
- if
(cmdParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
- complementDate = CronUtils.getSelfScheduleDateList(cmdParam);
- }
-
- if (CollectionUtils.isNotEmpty(complementDate) && Flag.NO ==
workflowInstance.getIsSubWorkflow()) {
- workflowInstance.setScheduleTime(complementDate.get(0));
- }
-
- // time zone
- String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
-
- String globalParams =
curingGlobalParamsService.curingGlobalParams(workflowInstance.getId(),
- workflowDefinition.getGlobalParamMap(),
- workflowDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA,
workflowInstance.getScheduleTime(), timezoneId);
- workflowInstance.setGlobalParams(globalParams);
- }
-
- /**
- * initialize task instance
- *
- * @param taskInstance taskInstance
- */
- private void initTaskInstance(TaskInstance taskInstance) {
-
- if (!TaskTypeUtils.isSubWorkflowTask(taskInstance.getTaskType())
- && (taskInstance.getState().isKill() ||
taskInstance.getState().isFailure())) {
- taskInstance.setFlag(Flag.NO);
- taskInstanceDao.updateById(taskInstance);
- return;
- }
- taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
- taskInstanceDao.updateById(taskInstance);
- }
-
- /**
- * update {@link ResourceInfo} by given original ResourceInfo
- *
- * @param res origin resource info
- * @return {@link ResourceInfo}
- */
- protected ResourceInfo updateResourceInfo(int task_id, ResourceInfo res) {
- ResourceInfo resourceInfo = null;
- // only if mainJar is not null and does not contain "resourceName"
field
- if (res != null) {
- String resourceFullName = res.getResourceName();
- if (StringUtils.isBlank(resourceFullName)) {
- log.error("invalid resource full name, {}", resourceFullName);
- return new ResourceInfo();
- }
- resourceInfo = new ResourceInfo();
- resourceInfo.setResourceName(resourceFullName);
- log.info("updated resource info {}",
- JSONUtils.toJsonString(resourceInfo));
- }
- return resourceInfo;
- }
-
- /**
- * delete work workflow map by parent workflow id
- *
- * @param parentWorkflowId parentWorkflowId
- * @return delete workflow map result
- */
- @Override
- public int deleteWorkflowMapByParentId(int parentWorkflowId) {
- return
workflowInstanceRelationMapper.deleteByParentWorkflowInstanceId(parentWorkflowId);
-
- }
-
/**
* find sub workflow instance
*
@@ -802,71 +300,6 @@ public class ProcessServiceImpl implements ProcessService {
return workflowInstance;
}
- /**
- * for show in page of taskInstance
- */
- @Override
- public void changeOutParam(TaskInstance taskInstance) {
- if (Strings.isNullOrEmpty(taskInstance.getVarPool())) {
- return;
- }
- List<Property> properties =
JSONUtils.toList(taskInstance.getVarPool(), Property.class);
- if (CollectionUtils.isEmpty(properties)) {
- return;
- }
- // if the result more than one line,just get the first .
- Map<String, Object> taskParams =
- JSONUtils.parseObject(taskInstance.getTaskParams(), new
TypeReference<Map<String, Object>>() {
- });
- Object localParams = taskParams.get(LOCAL_PARAMS);
- if (localParams == null) {
- return;
- }
- List<Property> allParam =
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
- Map<String, String> outProperty = new HashMap<>();
- for (Property info : properties) {
- if (info.getDirect() == Direct.OUT) {
- outProperty.put(info.getProp(), info.getValue());
- }
- }
- for (Property info : allParam) {
- if (info.getDirect() == Direct.OUT) {
- String paramName = info.getProp();
- info.setValue(outProperty.get(paramName));
- }
- }
- taskParams.put(LOCAL_PARAMS, allParam);
- taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams));
- }
-
- /**
- * convert integer list to string list
- *
- * @param intList intList
- * @return string list
- */
- private List<String> convertIntListToString(List<Integer> intList) {
- if (intList == null) {
- return new ArrayList<>();
- }
- List<String> result = new ArrayList<>(intList.size());
- for (Integer intVar : intList) {
- result.add(String.valueOf(intVar));
- }
- return result;
- }
-
- /**
- * query schedule by id
- *
- * @param id id
- * @return schedule
- */
- @Override
- public Schedule querySchedule(int id) {
- return scheduleMapper.selectById(id);
- }
-
/**
* query Schedule by workflowDefinitionCode
*
@@ -900,23 +333,6 @@ public class ProcessServiceImpl implements ProcessService {
return userMapper.selectById(userId);
}
- /**
- * format task app id in task instance
- */
- @Override
- public String formatTaskAppId(TaskInstance taskInstance) {
- WorkflowInstance workflowInstance =
findWorkflowInstanceById(taskInstance.getWorkflowInstanceId());
- if (workflowInstance == null) {
- return "";
- }
- WorkflowDefinition definition =
findWorkflowDefinition(workflowInstance.getWorkflowDefinitionCode(),
- workflowInstance.getWorkflowDefinitionVersion());
- if (definition == null) {
- return "";
- }
- return String.format("%s_%s_%s", definition.getId(),
workflowInstance.getId(), taskInstance.getId());
- }
-
/**
* list unauthorized
*
@@ -1015,18 +431,6 @@ public class ProcessServiceImpl implements ProcessService
{
return taskDefinitionMapper.updateById(taskDefinitionUpdate);
}
- /**
- * get resource ids
- *
- * @param taskDefinition taskDefinition
- * @return resource ids
- */
- @Deprecated
- @Override
- public String getResourceIds(TaskDefinition taskDefinition) {
- return "";
- }
-
@Override
public int saveTaskDefine(User operator, long projectCode,
List<TaskDefinitionLog> taskDefinitionLogs,
Boolean syncDefine) {
@@ -1327,30 +731,6 @@ public class ProcessServiceImpl implements ProcessService
{
return taskNodeList;
}
- @Override
- public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskInstanceId,
- String taskName,
- Integer taskGroupId,
- Integer workflowInstanceId,
- Integer taskGroupPriority,
- TaskGroupQueueStatus
status) {
- Date now = new Date();
- TaskGroupQueue taskGroupQueue = TaskGroupQueue.builder()
- .taskId(taskInstanceId)
- .taskName(taskName)
- .groupId(taskGroupId)
- .workflowInstanceId(workflowInstanceId)
- .priority(taskGroupPriority)
- .status(status)
- .forceStart(Flag.NO.getCode())
- .inQueue(Flag.NO.getCode())
- .createTime(now)
- .updateTime(now)
- .build();
- taskGroupQueueMapper.insert(taskGroupQueue);
- return taskGroupQueue;
- }
-
/**
* find k8s config yaml by clusterName
*
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 0177db26a2..89e0d3c187 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -17,22 +17,17 @@
package org.apache.dolphinscheduler.service.process;
-import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
-import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
-import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelationLog;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
@@ -44,18 +39,13 @@ import
org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
-import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.model.TaskNode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -118,25 +108,6 @@ public class ProcessServiceTest {
Assertions.assertEquals(user, processService.getUserById(123));
}
- @Test
- public void testFormatTaskAppId() {
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(333);
- taskInstance.setWorkflowInstanceId(222);
-
when(processService.findWorkflowInstanceById(taskInstance.getWorkflowInstanceId())).thenReturn(null);
- Assertions.assertEquals("",
processService.formatTaskAppId(taskInstance));
-
- WorkflowDefinition workflowDefinition = new WorkflowDefinition();
- workflowDefinition.setId(111);
- WorkflowInstance workflowInstance = new WorkflowInstance();
- workflowInstance.setId(222);
- workflowInstance.setWorkflowDefinitionVersion(1);
- workflowInstance.setWorkflowDefinitionCode(1L);
-
when(processService.findWorkflowInstanceById(taskInstance.getWorkflowInstanceId()))
- .thenReturn(workflowInstance);
- Assertions.assertEquals("",
processService.formatTaskAppId(taskInstance));
- }
-
@Test
public void testFindAllSubWorkflowDefinitionCode() {
int parentProcessDefineId = 1;
@@ -183,32 +154,6 @@ public class ProcessServiceTest {
Assertions.assertEquals(0,
processService.switchVersion(workflowDefinition, processDefinitionLog));
}
- @Test
- public void testSetGlobalParamIfCommanded() {
- WorkflowDefinition workflowDefinition = new WorkflowDefinition();
- String globalParams =
-
"[{\"prop\":\"global_param\",\"value\":\"4\",\"direct\":\"IN\",\"type\":\"VARCHAR\"},{\"prop\":\"O_ERRCODE\",\"value\":\"\",\"direct\":\"OUT\",\"type\":\"VARCHAR\"}]";
- workflowDefinition.setGlobalParams(globalParams);
- Map<String, String> globalParamMap =
workflowDefinition.getGlobalParamMap();
- Assertions.assertTrue(globalParamMap.size() == 2);
- Assertions.assertTrue(workflowDefinition.getGlobalParamList().size()
== 2);
-
- HashMap<String, String> startParams = new HashMap<>();
- String expectValue = "6";
- startParams.put("global_param", expectValue);
- HashMap<String, String> commandParams = new HashMap<>();
- commandParams.put(CMD_PARAM_START_PARAMS,
JSONUtils.toJsonString(startParams));
- Map<String, Property> mockStartParams = new HashMap<>();
-
- mockStartParams.put("global_param", new Property("global_param",
Direct.IN,
-
org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR,
startParams.get("global_param")));
-
when(curingGlobalParamsService.parseWorkflowStartParam(commandParams)).thenReturn(mockStartParams);
-
- processService.setGlobalParamIfCommanded(workflowDefinition,
commandParams);
-
Assertions.assertTrue(globalParamMap.get("global_param").equals(expectValue));
- Assertions.assertTrue(globalParamMap.containsKey("O_ERRCODE"));
- }
-
@Test
public void testSaveTaskDefine() {
User operator = new User();
@@ -299,62 +244,4 @@ public class ProcessServiceTest {
Assertions.assertEquals(1,
stringTaskNodeTaskNodeRelationDAG.getNodesCount());
}
- @Test
- public void testChangeOutParam() {
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setWorkflowInstanceId(62);
- WorkflowInstance workflowInstance = new WorkflowInstance();
- workflowInstance.setId(62);
-
taskInstance.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
-
taskInstance.setTaskParams("{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select
id from tb_test limit 1\","
- +
"\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\","
- +
"\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}],"
- +
"\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
- + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}");
- processService.changeOutParam(taskInstance);
- }
-
- @Test
- public void testUpdateResourceInfo() throws Exception {
- // test if input is null
- ResourceInfo resourceInfoNull = null;
- ResourceInfo updatedResourceInfo1 =
processService.updateResourceInfo(0, resourceInfoNull);
- Assertions.assertNull(updatedResourceInfo1);
-
- // test if resource id less than 1
- ResourceInfo resourceInfoVoid = new ResourceInfo();
- ResourceInfo updatedResourceInfo2 =
processService.updateResourceInfo(0, resourceInfoVoid);
- Assertions.assertNull(updatedResourceInfo2.getResourceName());
-
- // test normal situation
- ResourceInfo resourceInfoNormal = new ResourceInfo();
- resourceInfoNormal.setResourceName("/test.txt");
-
- ResourceInfo updatedResourceInfo3 =
processService.updateResourceInfo(0, resourceInfoNormal);
-
- Assertions.assertEquals("/test.txt",
updatedResourceInfo3.getResourceName());
-
- }
-
- @Test
- public void testCreateTaskGroupQueue() {
-
when(taskGroupQueueMapper.insert(Mockito.any(TaskGroupQueue.class))).thenReturn(1);
- TaskGroupQueue taskGroupQueue =
- processService.insertIntoTaskGroupQueue(1, "task name", 1, 1,
1, TaskGroupQueueStatus.WAIT_QUEUE);
- Assertions.assertNotNull(taskGroupQueue);
- }
-
- private TaskGroupQueue getTaskGroupQueue() {
- TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
- taskGroupQueue.setTaskName("task name");
- taskGroupQueue.setId(1);
- taskGroupQueue.setGroupId(1);
- taskGroupQueue.setTaskId(1);
- taskGroupQueue.setPriority(1);
- taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
- Date date = new Date(System.currentTimeMillis());
- taskGroupQueue.setUpdateTime(date);
- taskGroupQueue.setCreateTime(date);
- return taskGroupQueue;
- }
}