This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5f64badb45 [Fix-17239][Dependent] Dependent check get wrong result in
manual running execution type (#17240)
5f64badb45 is described below
commit 5f64badb45770eb4895e64b88bc6b16c1f2d804f
Author: xiangzihao <[email protected]>
AuthorDate: Thu Jun 5 09:53:58 2025 +0800
[Fix-17239][Dependent] Dependent check get wrong result in manual running
execution type (#17240)
---
.../dao/mapper/WorkflowInstanceMapper.java | 52 ++++++++++++----------
.../dao/repository/WorkflowInstanceDao.java | 2 +
.../repository/impl/WorkflowInstanceDaoImpl.java | 10 +++++
.../dao/mapper/WorkflowInstanceMapper.xml | 18 ++++++++
.../server/master/utils/DependentExecute.java | 19 +++++---
5 files changed, 72 insertions(+), 29 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
index a9f743f49a..30484635d2 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java
@@ -65,6 +65,7 @@ public interface WorkflowInstanceMapper extends
BaseMapper<WorkflowInstance> {
*/
List<WorkflowInstance> queryMainWorkflowByHostAndStatus(@Param("host")
String host,
@Param("states")
int[] stateArray);
+
/**
* query workflow instance host by stateArray
*
@@ -108,15 +109,15 @@ public interface WorkflowInstanceMapper extends
BaseMapper<WorkflowInstance> {
/**
* workflow instance page
*
- * @param page page
- * @param projectCode projectCode
+ * @param page page
+ * @param projectCode projectCode
* @param workflowDefinitionCode workflowDefinitionCode
- * @param searchVal searchVal
- * @param executorName executorName
- * @param statusArray statusArray
- * @param host host
- * @param startTime startTime
- * @param endTime endTime
+ * @param searchVal searchVal
+ * @param executorName executorName
+ * @param statusArray statusArray
+ * @param host host
+ * @param startTime startTime
+ * @param endTime endTime
* @return workflow instance page
*/
IPage<WorkflowInstance>
queryWorkflowInstanceListPaging(Page<WorkflowInstance> page,
@@ -186,7 +187,7 @@ public interface WorkflowInstanceMapper extends
BaseMapper<WorkflowInstance> {
* query workflow instance by workflowDefinitionCode
*
* @param workflowDefinitionCode workflowDefinitionCode
- * @param size size
+ * @param size size
* @return workflow instance list
*/
List<WorkflowInstance>
queryByWorkflowDefinitionCode(@Param("workflowDefinitionCode") Long
workflowDefinitionCode,
@@ -196,10 +197,10 @@ public interface WorkflowInstanceMapper extends
BaseMapper<WorkflowInstance> {
* query last scheduler workflow instance
*
* @param workflowDefinitionCode definitionCode
- * @param taskDefinitionCode definitionCode
- * @param startTime startTime
- * @param endTime endTime
- * @param testFlag testFlag
+ * @param taskDefinitionCode definitionCode
+ * @param startTime startTime
+ * @param endTime endTime
+ * @param testFlag testFlag
* @return workflow instance
*/
WorkflowInstance
queryLastSchedulerWorkflow(@Param("workflowDefinitionCode") Long
workflowDefinitionCode,
@@ -212,10 +213,10 @@ public interface WorkflowInstanceMapper extends
BaseMapper<WorkflowInstance> {
* query last manual workflow instance
*
* @param workflowDefinitionCode workflowDefinitionCode
- * @param taskCode taskCode
- * @param startTime startTime
- * @param endTime endTime
- * @param testFlag testFlag
+ * @param taskCode taskCode
+ * @param startTime startTime
+ * @param endTime endTime
+ * @param testFlag testFlag
* @return workflow instance
*/
WorkflowInstance queryLastManualWorkflow(@Param("workflowDefinitionCode")
Long workflowDefinitionCode,
@@ -224,6 +225,11 @@ public interface WorkflowInstanceMapper extends
BaseMapper<WorkflowInstance> {
@Param("endTime") Date endTime,
@Param("testFlag") int testFlag);
+ WorkflowInstance queryLastRunningWorkflow(@Param("workflowDefinitionCode")
Long workflowDefinitionCode,
+ @Param("startTime") Date
startTime,
+ @Param("endTime") Date endTime,
+ @Param("states") int[]
stateArray);
+
/**
* query first schedule workflow instance
*
@@ -261,7 +267,7 @@ public interface WorkflowInstanceMapper extends
BaseMapper<WorkflowInstance> {
* query workflow instance by workflowDefinitionCode and stateArray
*
* @param workflowDefinitionCode workflowDefinitionCode
- * @param states states array
+ * @param states states array
* @return workflow instance list
*/
@@ -275,12 +281,12 @@ public interface WorkflowInstanceMapper extends
BaseMapper<WorkflowInstance> {
/**
* Filter workflow instance
*
- * @param page page
+ * @param page page
* @param workflowDefinitionCode workflowDefinitionCode
- * @param name name
- * @param host host
- * @param startTime startTime
- * @param endTime endTime
+ * @param name name
+ * @param host host
+ * @param startTime startTime
+ * @param endTime endTime
* @return workflow instance IPage
*/
IPage<WorkflowInstance>
queryWorkflowInstanceListV2Paging(Page<WorkflowInstance> page,
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
index 5a2fb4e147..94a6a85c6c 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java
@@ -68,6 +68,8 @@ public interface WorkflowInstanceDao extends
IDao<WorkflowInstance> {
WorkflowInstance queryLastManualWorkflowInterval(Long definitionCode, Long
taskCode, DateInterval dateInterval,
int testFlag);
+ WorkflowInstance queryLastRunningWorkflowInterval(Long definitionCode,
DateInterval dateInterval);
+
/**
* query first schedule workflow instance
*
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
index 996678c581..1446047e2b 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java
@@ -120,6 +120,16 @@ public class WorkflowInstanceDaoImpl extends
BaseDao<WorkflowInstance, WorkflowI
testFlag);
}
+ @Override
+ public WorkflowInstance queryLastRunningWorkflowInterval(Long
definitionCode, DateInterval dateInterval) {
+ int[] runningStateArray = new
int[]{WorkflowExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+ WorkflowExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ WorkflowExecutionStatus.READY_PAUSE.ordinal(),
+ WorkflowExecutionStatus.READY_STOP.ordinal()};
+ return mybatisMapper.queryLastRunningWorkflow(definitionCode,
dateInterval.getStartTime(),
+ dateInterval.getEndTime(), runningStateArray);
+ }
+
/**
* query first schedule process instance
*
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
index 83fba41769..a52e0dcd02 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml
@@ -249,6 +249,24 @@
order by t1.end_time desc limit 1
</select>
+ <select id="queryLastRunningWorkflow"
resultType="org.apache.dolphinscheduler.dao.entity.WorkflowInstance">
+ select
+ <include refid="baseSql"/>
+ from t_ds_workflow_instance
+ where workflow_definition_code=#{workflowDefinitionCode}
+ <if test="states !=null and states.length != 0">
+ and state in
+ <foreach collection="states" item="i" index="index" open="("
separator="," close=")">
+ #{i}
+ </foreach>
+ </if>
+ <if test="startTime!=null and endTime != null ">
+ and ((schedule_time <![CDATA[ >= ]]> #{startTime} and
schedule_time <![CDATA[ <= ]]> #{endTime})
+ or (start_time <![CDATA[ >= ]]> #{startTime} and start_time
<![CDATA[ <= ]]> #{endTime}))
+ </if>
+ order by start_time desc limit 1
+ </select>
+
<select id="queryFirstScheduleWorkflowInstance"
resultType="org.apache.dolphinscheduler.dao.entity.WorkflowInstance">
select
<include refid="baseSql"/>
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
index 300a989b42..9389accb35 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
@@ -155,7 +155,7 @@ public class DependentExecute {
DependResult result = DependResult.FAILED;
for (DateInterval dateInterval : dateIntervals) {
WorkflowInstance workflowInstance =
-
findLastWorkflowInterval(dependentItem.getDefinitionCode(),
dependentItem.getDepTaskCode(),
+
findDependentWorkflowCandidate(dependentItem.getDefinitionCode(),
dependentItem.getDepTaskCode(),
dateInterval, testFlag);
if (workflowInstance == null) {
return DependResult.WAITING;
@@ -314,17 +314,24 @@ public class DependentExecute {
}
/**
- * find the last one workflow instance that :
- * 1. manual run and finish between the interval
- * 2. schedule run and schedule time between the interval
+ * find the last one workflow instance that:
+ * 1. running workflow instance in the date interval
+ * 2. manual run and finish between the interval
+ * 3. schedule run and schedule time between the interval
*
* @param definitionCode definition code
* @param taskCode task code
* @param dateInterval date interval
* @return workflowInstance
*/
- private WorkflowInstance findLastWorkflowInterval(Long definitionCode,
Long taskCode, DateInterval dateInterval,
- int testFlag) {
+ private WorkflowInstance findDependentWorkflowCandidate(Long
definitionCode, Long taskCode,
+ DateInterval
dateInterval,
+ int testFlag) {
+ WorkflowInstance runningWorkflow =
+
workflowInstanceDao.queryLastRunningWorkflowInterval(definitionCode,
dateInterval);
+ if (runningWorkflow != null) {
+ return runningWorkflow;
+ }
WorkflowInstance lastSchedulerWorkflowInstance =
workflowInstanceDao.queryLastSchedulerWorkflowInterval(definitionCode,
taskCode, dateInterval,