det101 commented on code in PR #17037:
URL: 
https://github.com/apache/dolphinscheduler/pull/17037#discussion_r2034810130


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
+import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * WorkerGroupTaskDispatcher is responsible for dispatching tasks from the 
task queue.
+ * The main responsibilities include:
+ * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for 
dispatch.
+ * 2. Re-queuing tasks that fail to dispatch according to retry logic.
+ * 3. Ensuring thread safety and correct state transitions during task 
processing.
+ */
+@Slf4j
+public class WorkerGroupTaskDispatcher extends BaseDaemonThread {
+
+    private final ITaskExecutorClient taskExecutorClient;
+
+    // todo: The current queue is flawed. When a high-priority task fails,
+    // it will be delayed and will not return to the first or second position.
+    // Tasks with the same priority will preempt its position.
+    // If it needs to be placed at the front of the queue, the queue needs to 
be re-implemented.
+    private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> 
workerGroupQueue;
+
+    private final AtomicReference<DispatchWorkerStatus> status = new 
AtomicReference<>(DispatchWorkerStatus.INIT);
+
+    public WorkerGroupTaskDispatcher(String workerGroupName, 
ITaskExecutorClient taskExecutorClient) {
+        super("WorkerGroupTaskDispatcher-" + workerGroupName);
+        this.taskExecutorClient = taskExecutorClient;
+        this.workerGroupQueue = new PriorityDelayQueue<>();
+    }
+
+    /**
+     * Adds a task to the worker group queue.
+     *
+     * This method wraps the given task execution object into a priority and 
delay-based task entry and adds it to the worker group queue.
+     * The task is only added if the current dispatcher status is either 
STARTED or INIT. If the dispatcher is in any other state,
+     * the task addition will fail, and a warning message will be logged.
+     *
+     * @param taskExecutionRunnable The task execution object to add to the 
queue, which implements the {@link ITaskExecutionRunnable} interface.
+     * @param delayTimeMills The delay time in milliseconds before the task 
should be executed.
+     * @return true if the task was successfully added to the queue, false 
otherwise.
+     */
+    public boolean addTaskToWorkerGroupQueue(ITaskExecutionRunnable 
taskExecutionRunnable,
+                                             long delayTimeMills) {
+        if (status.get() == DispatchWorkerStatus.STARTED || status.get() == 
DispatchWorkerStatus.INIT) {
+            workerGroupQueue.add(new 
PriorityAndDelayBasedTaskEntry(delayTimeMills, taskExecutionRunnable));
+            return true;
+        } else {
+            log.warn("The {} status is {}, task can not add Queue, it will 
fail", this.getName(), status.get());
+        }
+        return false;
+    }
+
+    @Override
+    public synchronized void start() {
+        if (status.compareAndSet(DispatchWorkerStatus.INIT, 
DispatchWorkerStatus.STARTED)) {
+            log.info("The {} starting...", this.getName());
+            super.start();
+            log.info("The {}  started", this.getName());
+        } else {
+            log.error("The {} status is {}, will not start again", 
this.getName(), status.get());
+        }
+    }
+
+    public void markDispatcherClosing() {
+        if (status.get() != DispatchWorkerStatus.CLOSED) {
+            status.set(DispatchWorkerStatus.CLOSING);
+        } else {
+            log.warn("The {} is Closed, will not markDispatcherClosing again", 
this.getName());
+        }
+    }
+
+    public void markDispatcherStart() {
+        if (status.compareAndSet(DispatchWorkerStatus.CLOSING, 
DispatchWorkerStatus.STARTED)) {
+            log.info("The {} markDispatcherStart...", this.getName());
+        } else {
+            log.warn("The {} status is {}, will not markDispatcherStart", 
this.getName(), status.get());
+        }
+    }

Review Comment:
   Remove the markstart logic for WorkerGroupTaskDispatcher. Since this 
scenario is rare and the logic is simplified for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to