ruanwenjun commented on code in PR #17037:
URL:
https://github.com/apache/dolphinscheduler/pull/17037#discussion_r1980534651
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java:
##########
@@ -31,18 +32,19 @@
/**
* The class is used to store {@link ITaskExecutionRunnable} which needs to be
dispatched. The {@link ITaskExecutionRunnable}
- * will be stored in {@link PriorityDelayQueue}, if the {@link
ITaskExecutionRunnable}'s delay time is 0, then it will be
+ * will be stored in {@link DelayQueue}, if the {@link
ITaskExecutionRunnable}'s delay time is 0, then it will be
* consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
* <p>
- * The order of {@link ITaskExecutionRunnable} in the {@link
PriorityDelayQueue} is determined by {@link ITaskExecutionRunnable#compareTo}.
+ * The order of {@link ITaskExecutionRunnable} in the {@link DelayQueue} is
determined by {@link ITaskExecutionRunnable#compareTo}.
*/
@Slf4j
@Component
public class GlobalTaskDispatchWaitingQueue {
private final Set<Integer> waitingTaskInstanceIds =
ConcurrentHashMap.newKeySet();
- private final PriorityDelayQueue<DelayEntry<ITaskExecutionRunnable>>
priorityDelayQueue =
- new PriorityDelayQueue<>();
+
+ private final PriorityBlockingQueue<DelayEntry<ITaskExecutionRunnable>>
priorityQueue =
Review Comment:
You need to use `DelayQueue` here? Otherwise how to make sure the item will
be poll out by a delay time.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayEntry.java:
##########
@@ -17,25 +17,24 @@
package org.apache.dolphinscheduler.server.master.runner.queue;
-import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
-import lombok.SneakyThrows;
+import org.jetbrains.annotations.NotNull;
-public class PriorityDelayQueue<V extends DelayEntry> {
+public class PriorityDelayEntry<V extends Comparable<V>> extends DelayEntry {
Review Comment:
Don't change this class.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupQueueLooper.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.queue.WorkerGroupQueueMap;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class WorkerGroupQueueLooper extends BaseDaemonThread implements
AutoCloseable {
+
+ @Autowired
+ private WorkerGroupQueueMap workerGroupQueueMap;
+
+ @Autowired
+ private ITaskExecutorClient taskExecutorClient;
+
+ private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
+
+ public WorkerGroupQueueLooper() {
+ super("WorkerGroupQueueLooper");
+ }
+
+ @Override
+ public synchronized void start() {
+ if (!RUNNING_FLAG.compareAndSet(false, true)) {
+ log.error("The WorkerGroupQueueLooper already started, will not
start again");
+ return;
+ }
+ log.info("WorkerGroupQueueLooper starting...");
+ super.start();
+ log.info("WorkerGroupQueueLooper started...");
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (RUNNING_FLAG.compareAndSet(true, false)) {
+ log.info("WorkerGroupQueueLooper stopping...");
+ log.info("WorkerGroupQueueLooper stopped...");
+ } else {
+ log.error("WorkerGroupQueueLooper is not started");
+ }
+ }
+
+ @Override
+ public void run() {
+ while (RUNNING_FLAG.get()) {
+ doDispatch();
+ }
Review Comment:
Will make CPU 100%
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/WorkerGroupQueueMap.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.queue;
+
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.DelayQueue;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkerGroupQueueMap {
+
+ private final Map<String,
DelayQueue<PriorityDelayEntry<ITaskExecutionRunnable>>> queueMap = new
HashMap<>();
Review Comment:
```suggestion
private final Map<String,
DelayQueue<PriorityDelayEntry<ITaskExecutionRunnable>>> queueMap = new
ConcurrentHashMap<>();
```
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java:
##########
@@ -42,6 +43,9 @@ public class WorkflowEngine implements AutoCloseable {
@Autowired
private LogicTaskEngineDelegator logicTaskEngineDelegator;
+ @Autowired
+ private WorkerGroupQueueLooper workerGroupQueueLooper;
Review Comment:
```suggestion
private WorkerGroupTaskDispatchWaitingQueueLooper
workerGroupTaskDispatchWatingQueueLooper;
```
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java:
##########
@@ -57,19 +59,21 @@ public synchronized void
dispatchTaskExecuteRunnable(ITaskExecutionRunnable ITas
public synchronized void
dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable
taskExecutionRunnable,
long
delayTimeMills) {
waitingTaskInstanceIds.add(taskExecutionRunnable.getTaskInstance().getId());
- priorityDelayQueue.add(new DelayEntry<>(delayTimeMills,
taskExecutionRunnable));
+ priorityQueue.add(new DelayEntry<>(delayTimeMills,
taskExecutionRunnable));
}
/**
- * Consume {@link ITaskExecutionRunnable} from the {@link
PriorityDelayQueue}, only the delay time <= 0 can be consumed.
+ * Consume {@link ITaskExecutionRunnable} from the {@link DelayQueue},
only the delay time <= 0 can be consumed.
*/
@SneakyThrows
- public ITaskExecutionRunnable takeTaskExecuteRunnable() {
- ITaskExecutionRunnable taskExecutionRunnable =
priorityDelayQueue.take().getData();
+ public DelayEntry<ITaskExecutionRunnable> takeTaskExecuteRunnable() {
Review Comment:
```suggestion
public ITaskExecutionRunnable takeTaskExecuteRunnable() {
```
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java:
##########
@@ -65,25 +64,10 @@ public void run() {
}
void doDispatch() {
- final ITaskExecutionRunnable taskExecutionRunnable =
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
- final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
- try {
- final TaskExecutionStatus status = taskInstance.getState();
- if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status !=
TaskExecutionStatus.DELAY_EXECUTION) {
- log.warn("The TaskInstance {} state is : {}, will not
dispatch", taskInstance.getName(), status);
- return;
- }
- taskExecutorClient.dispatch(taskExecutionRunnable);
- } catch (Exception e) {
- // If dispatch failed, will put the task back to the queue
- // The task will be dispatched after waiting time.
- // the waiting time will increase multiple of times, but will not
exceed 60 seconds
- long waitingTimeMills = Math.min(
-
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() *
1_000L, 60_000L);
-
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
- waitingTimeMills);
- log.error("Dispatch Task: {} failed will retry after: {}/ms",
taskInstance.getName(), waitingTimeMills, e);
- }
+ DelayEntry<ITaskExecutionRunnable> delayEntry =
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
Review Comment:
Need to add log here.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java:
##########
@@ -38,8 +38,7 @@ public class GlobalTaskDispatchWaitingQueueLooper extends
BaseDaemonThread imple
private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
@Autowired
- private ITaskExecutorClient taskExecutorClient;
-
+ private WorkerGroupQueueMap workerGroupQueueMap;
Review Comment:
```suggestion
private WorkerGroupTaskDispatchWaitingQueues
workerGroupTaskDispatchWaitingQueues;
```
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java:
##########
@@ -54,14 +54,6 @@ public long getDelay(@NotNull TimeUnit unit) {
@Override
public int compareTo(@NotNull Delayed o) {
DelayEntry<V> other = (DelayEntry<V>) o;
- int delayTimeMillsCompareResult = Long.compare(delayTimeMills,
other.delayTimeMills);
- if (delayTimeMillsCompareResult != 0) {
- return delayTimeMillsCompareResult;
- }
-
- if (data == null || other.data == null) {
- return 0;
- }
Review Comment:
Don't do this kind of change, this class is `DelayEntry` if you remove the
compare logic this class cannot work
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]