ruanwenjun commented on code in PR #17037:
URL: 
https://github.com/apache/dolphinscheduler/pull/17037#discussion_r1980534651


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java:
##########
@@ -31,18 +32,19 @@
 
 /**
  * The class is used to store {@link ITaskExecutionRunnable} which needs to be 
dispatched. The {@link ITaskExecutionRunnable}
- * will be stored in {@link PriorityDelayQueue}, if the {@link 
ITaskExecutionRunnable}'s delay time is 0, then it will be
+ * will be stored in {@link DelayQueue}, if the {@link 
ITaskExecutionRunnable}'s delay time is 0, then it will be
  * consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
  * <p>
- * The order of {@link ITaskExecutionRunnable} in the {@link 
PriorityDelayQueue} is determined by {@link ITaskExecutionRunnable#compareTo}.
+ * The order of {@link ITaskExecutionRunnable} in the {@link DelayQueue} is 
determined by {@link ITaskExecutionRunnable#compareTo}.
  */
 @Slf4j
 @Component
 public class GlobalTaskDispatchWaitingQueue {
 
     private final Set<Integer> waitingTaskInstanceIds = 
ConcurrentHashMap.newKeySet();
-    private final PriorityDelayQueue<DelayEntry<ITaskExecutionRunnable>> 
priorityDelayQueue =
-            new PriorityDelayQueue<>();
+
+    private final PriorityBlockingQueue<DelayEntry<ITaskExecutionRunnable>> 
priorityQueue =

Review Comment:
   You need to use `DelayQueue` here? Otherwise how to make sure the item will 
be poll out by a delay time.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayEntry.java:
##########
@@ -17,25 +17,24 @@
 
 package org.apache.dolphinscheduler.server.master.runner.queue;
 
-import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
 
-import lombok.SneakyThrows;
+import org.jetbrains.annotations.NotNull;
 
-public class PriorityDelayQueue<V extends DelayEntry> {
+public class PriorityDelayEntry<V extends Comparable<V>> extends DelayEntry {

Review Comment:
   Don't change this class.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupQueueLooper.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.queue.WorkerGroupQueueMap;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class WorkerGroupQueueLooper extends BaseDaemonThread implements 
AutoCloseable {
+
+    @Autowired
+    private WorkerGroupQueueMap workerGroupQueueMap;
+
+    @Autowired
+    private ITaskExecutorClient taskExecutorClient;
+
+    private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
+
+    public WorkerGroupQueueLooper() {
+        super("WorkerGroupQueueLooper");
+    }
+
+    @Override
+    public synchronized void start() {
+        if (!RUNNING_FLAG.compareAndSet(false, true)) {
+            log.error("The WorkerGroupQueueLooper already started, will not 
start again");
+            return;
+        }
+        log.info("WorkerGroupQueueLooper starting...");
+        super.start();
+        log.info("WorkerGroupQueueLooper started...");
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (RUNNING_FLAG.compareAndSet(true, false)) {
+            log.info("WorkerGroupQueueLooper stopping...");
+            log.info("WorkerGroupQueueLooper stopped...");
+        } else {
+            log.error("WorkerGroupQueueLooper is not started");
+        }
+    }
+
+    @Override
+    public void run() {
+        while (RUNNING_FLAG.get()) {
+            doDispatch();
+        }

Review Comment:
   Will make CPU 100%



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/WorkerGroupQueueMap.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.queue;
+
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.DelayQueue;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WorkerGroupQueueMap {
+
+    private final Map<String, 
DelayQueue<PriorityDelayEntry<ITaskExecutionRunnable>>> queueMap = new 
HashMap<>();

Review Comment:
   ```suggestion
       private final Map<String, 
DelayQueue<PriorityDelayEntry<ITaskExecutionRunnable>>> queueMap = new 
ConcurrentHashMap<>();
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java:
##########
@@ -42,6 +43,9 @@ public class WorkflowEngine implements AutoCloseable {
     @Autowired
     private LogicTaskEngineDelegator logicTaskEngineDelegator;
 
+    @Autowired
+    private WorkerGroupQueueLooper workerGroupQueueLooper;

Review Comment:
   ```suggestion
       private WorkerGroupTaskDispatchWaitingQueueLooper 
workerGroupTaskDispatchWatingQueueLooper;
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java:
##########
@@ -57,19 +59,21 @@ public synchronized void 
dispatchTaskExecuteRunnable(ITaskExecutionRunnable ITas
     public synchronized void 
dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable 
taskExecutionRunnable,
                                                                   long 
delayTimeMills) {
         
waitingTaskInstanceIds.add(taskExecutionRunnable.getTaskInstance().getId());
-        priorityDelayQueue.add(new DelayEntry<>(delayTimeMills, 
taskExecutionRunnable));
+        priorityQueue.add(new DelayEntry<>(delayTimeMills, 
taskExecutionRunnable));
     }
 
     /**
-     * Consume {@link ITaskExecutionRunnable} from the {@link 
PriorityDelayQueue}, only the delay time <= 0 can be consumed.
+     * Consume {@link ITaskExecutionRunnable} from the {@link DelayQueue}, 
only the delay time <= 0 can be consumed.
      */
     @SneakyThrows
-    public ITaskExecutionRunnable takeTaskExecuteRunnable() {
-        ITaskExecutionRunnable taskExecutionRunnable = 
priorityDelayQueue.take().getData();
+    public DelayEntry<ITaskExecutionRunnable> takeTaskExecuteRunnable() {

Review Comment:
   ```suggestion
       public ITaskExecutionRunnable takeTaskExecuteRunnable() {
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java:
##########
@@ -65,25 +64,10 @@ public void run() {
     }
 
     void doDispatch() {
-        final ITaskExecutionRunnable taskExecutionRunnable = 
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
-        final TaskInstance taskInstance = 
taskExecutionRunnable.getTaskInstance();
-        try {
-            final TaskExecutionStatus status = taskInstance.getState();
-            if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status != 
TaskExecutionStatus.DELAY_EXECUTION) {
-                log.warn("The TaskInstance {} state is : {}, will not 
dispatch", taskInstance.getName(), status);
-                return;
-            }
-            taskExecutorClient.dispatch(taskExecutionRunnable);
-        } catch (Exception e) {
-            // If dispatch failed, will put the task back to the queue
-            // The task will be dispatched after waiting time.
-            // the waiting time will increase multiple of times, but will not 
exceed 60 seconds
-            long waitingTimeMills = Math.min(
-                    
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 
1_000L, 60_000L);
-            
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
-                    waitingTimeMills);
-            log.error("Dispatch Task: {} failed will retry after: {}/ms", 
taskInstance.getName(), waitingTimeMills, e);
-        }
+        DelayEntry<ITaskExecutionRunnable> delayEntry = 
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();

Review Comment:
   Need to add log here.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java:
##########
@@ -38,8 +38,7 @@ public class GlobalTaskDispatchWaitingQueueLooper extends 
BaseDaemonThread imple
     private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
 
     @Autowired
-    private ITaskExecutorClient taskExecutorClient;
-
+    private WorkerGroupQueueMap workerGroupQueueMap;

Review Comment:
   ```suggestion
       private WorkerGroupTaskDispatchWaitingQueues 
workerGroupTaskDispatchWaitingQueues;
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java:
##########
@@ -54,14 +54,6 @@ public long getDelay(@NotNull TimeUnit unit) {
     @Override
     public int compareTo(@NotNull Delayed o) {
         DelayEntry<V> other = (DelayEntry<V>) o;
-        int delayTimeMillsCompareResult = Long.compare(delayTimeMills, 
other.delayTimeMills);
-        if (delayTimeMillsCompareResult != 0) {
-            return delayTimeMillsCompareResult;
-        }
-
-        if (data == null || other.data == null) {
-            return 0;
-        }

Review Comment:
   Don't do this kind of change, this class is `DelayEntry` if you remove the 
compare logic this class cannot work



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to