This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 6d1874ecf2 [Fix-17831] Fix incorrect parallelism num when complement
data in parallel excution mode (#17853)
6d1874ecf2 is described below
commit 6d1874ecf2531b969f38b3c1ca2bdf18549b20da
Author: xiangzihao <[email protected]>
AuthorDate: Thu Jan 8 15:36:49 2026 +0800
[Fix-17831] Fix incorrect parallelism num when complement data in parallel
excution mode (#17853)
---
.../workflow/BackfillWorkflowExecutorDelegate.java | 29 ++++++++++++++++++++--
1 file changed, 27 insertions(+), 2 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
index 3f695e810c..0c684f2876 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
@@ -35,6 +35,7 @@ import
org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.ZonedDateTime;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -95,9 +96,9 @@ public class BackfillWorkflowExecutorDelegate implements
IExecutorDelegate<Backf
expectedParallelismNumber = listDate.size();
}
- log.info("In parallel mode, current expectedParallelismNumber:{}",
expectedParallelismNumber);
+ log.info("In parallel mode, current expectedParallelismNumber: {}",
expectedParallelismNumber);
final List<Integer> workflowInstanceIdList = Lists.newArrayList();
- for (List<ZonedDateTime> stringDate : Lists.partition(listDate,
expectedParallelismNumber)) {
+ for (List<ZonedDateTime> stringDate : splitDateTime(listDate,
expectedParallelismNumber)) {
final Integer workflowInstanceId = doBackfillWorkflow(
backfillWorkflowDTO,
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
@@ -106,6 +107,30 @@ public class BackfillWorkflowExecutorDelegate implements
IExecutorDelegate<Backf
return workflowInstanceIdList;
}
+ /**
+ * split date time list into n parts, the last part may be larger if not
divisible
+ */
+ private List<List<ZonedDateTime>> splitDateTime(List<ZonedDateTime>
dateTimeList, int numParts) {
+ List<List<ZonedDateTime>> result = new ArrayList<>();
+ int n = dateTimeList.size();
+
+ int baseSize = n / numParts;
+ int remainder = n % numParts;
+
+ int start = 0;
+ for (int i = 0; i < numParts; i++) {
+ int currentSize = baseSize;
+ if (i == numParts - 1) {
+ currentSize += remainder;
+ }
+ List<ZonedDateTime> part = dateTimeList.subList(start, start +
currentSize);
+ result.add(part);
+ start += currentSize;
+ }
+
+ return result;
+ }
+
private Integer doBackfillWorkflow(final BackfillWorkflowDTO
backfillWorkflowDTO,
final List<String> backfillTimeList) {
final Server masterServer =
registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);