ruanwenjun commented on code in PR #17796:
URL:
https://github.com/apache/dolphinscheduler/pull/17796#discussion_r2623065956
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/WorkerGroupNotFoundException.java:
##########
@@ -19,7 +19,7 @@
public class WorkerGroupNotFoundException extends TaskDispatchException {
- public WorkerGroupNotFoundException(String workerGroup) {
- super("Cannot find worker group: " + workerGroup);
+ public WorkerGroupNotFoundException(String message) {
Review Comment:
Why do this change?
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java:
##########
@@ -126,6 +126,11 @@ public class TaskExecutionContext implements Serializable {
private boolean failover;
+ /**
+ * Timestamp (ms) when the task was first enqueued for dispatch.
+ */
+ private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis();
Review Comment:
```suggestion
private final long firstDispatchTime = System.currentTimeMillis();
```
This is more like the creation time of the `taskExecutionContext`.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java:
##########
@@ -84,23 +95,77 @@ public void run() {
}
private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
+ final int taskId = taskExecutionRunnable.getId();
+ final TaskExecutionContext taskExecutionContext =
taskExecutionRunnable.getTaskExecutionContext();
+ final long timeoutMs = this.dispatchTimeout.toMillis();
Review Comment:
It's better to store the mills rather than store duration, then you can
avoid execute `dispatchTimeout.toMillis()` here.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java:
##########
@@ -74,6 +74,12 @@ public class MasterConfig implements Validator {
*/
private String masterRegistryPath;
+ /**
+ * Maximum time allowed for a task to be successfully dispatched.
+ * Default: 5 minutes.
+ */
+ private Duration dispatchTimeout = Duration.ofMinutes(5);
Review Comment:
```suggestion
private Duration maxTaskDispatchDuration;
```
`dispatchTimeout` might confuse with single RPC timeout when dispatch task,
and the default value should be null or a large duration, should compatibility
with the previous behavior.
And you need to add this in document.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/WorkerNotFoundException.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.exception.dispatch;
+
+public class WorkerNotFoundException extends TaskDispatchException {
+
+ public WorkerNotFoundException(String message) {
Review Comment:
```suggestion
public NoAvailableWorkerException(String workerGroup) {
super("Cannot find available worker under worker group: " +
workerGroup);
}
```
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java:
##########
@@ -84,23 +95,77 @@ public void run() {
}
private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
+ final int taskId = taskExecutionRunnable.getId();
Review Comment:
```suggestion
final int taskExecutionRunnableId = taskExecutionRunnable.getId();
```
Avoid confuse with task definition id.
##########
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java:
##########
@@ -46,7 +47,8 @@ class WorkerGroupDispatcherTest {
@BeforeEach
void setUp() {
taskExecutorClient = mock(ITaskExecutorClient.class);
- dispatcher = new WorkerGroupDispatcher("TestGroup",
taskExecutorClient);
+ final MasterConfig masterConfig = new MasterConfig();
+ dispatcher = new WorkerGroupDispatcher("TestGroup",
taskExecutorClient, masterConfig.getDispatchTimeout());
Review Comment:
You should add a validated test case.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java:
##########
@@ -84,23 +95,77 @@ public void run() {
}
private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
+ final int taskId = taskExecutionRunnable.getId();
+ final TaskExecutionContext taskExecutionContext =
taskExecutionRunnable.getTaskExecutionContext();
+ final long timeoutMs = this.dispatchTimeout.toMillis();
try {
- if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId()))
{
+ if (!waitingDispatchTaskIds.remove(taskId)) {
log.info(
"The task: {} doesn't exist in
waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
- taskExecutionRunnable.getId());
+ taskId);
return;
}
taskExecutorClient.dispatch(taskExecutionRunnable);
- } catch (Exception e) {
+ } catch (TaskDispatchException ex) {
+ // Checks whether the given task has exceeded its allowed dispatch
timeout.
+ long elapsed = System.currentTimeMillis() -
taskExecutionContext.getFirstDispatchEnqueueTimeMs();
+ if (elapsed > timeoutMs) {
+ handleDispatchFailure(taskExecutionRunnable, ex, elapsed,
timeoutMs);
+ return;
+ }
+
// If dispatch failed, will put the task back to the queue
// The task will be dispatched after waiting time.
// the waiting time will increase multiple of times, but will not
exceed 60 seconds
- long waitingTimeMills = Math.min(
+ long waitingTimeMillis = Math.min(
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() *
1_000L, 60_000L);
- dispatchTask(taskExecutionRunnable, waitingTimeMills);
- log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskExecutionRunnable.getId(),
- waitingTimeMills, e);
+ dispatchTask(taskExecutionRunnable, waitingTimeMillis);
+ log.warn("Dispatch Task: {} failed will retry after: {}/ms",
taskId,
+ waitingTimeMillis, ex);
+ }
+ }
+
+ /**
+ * Marks the specified task as fatally failed due to an unrecoverable
dispatch error,such as timeout or persistent client failure.
+ * Once this method is called, the task is considered permanently failed
and will not be retried.
+ *
+ * @param taskExecutionRunnable the task to mark as fatally failed; must
not be null
+ * @param exception the dispatch exception that triggered this
failure handling; must not be null
+ * @param elapsed the time (in milliseconds) already spent
attempting to dispatch the task
+ * @param timeoutMs the configured dispatch timeout threshold
(in milliseconds)
+ */
+ private void handleDispatchFailure(ITaskExecutionRunnable
taskExecutionRunnable, TaskDispatchException exception,
Review Comment:
Please don't print the `taskId` and `workflowId` here, all ids should
already be added by MDC. We should only need to print the exception here, the
`exception` already contains failure message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]