ruanwenjun commented on code in PR #17037:
URL: 
https://github.com/apache/dolphinscheduler/pull/17037#discussion_r2001070666


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
+import 
org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier;
+import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * WorkerGroupTaskDispatcherManager is responsible for managing the task 
dispatching for worker groups.
+ * It maintains a mapping of worker groups to their task dispatchers and 
priority delay queues,
+ * and supports adding tasks, starting and stopping worker groups, as well as 
cleaning up resources upon shutdown.
+ */
+@Component
+@Slf4j
+public class WorkerGroupTaskDispatcherManager implements AutoCloseable, 
WorkerGroupChangeNotifier.WorkerGroupListener {
+
+    private static final long CHECK_DELETE_DISPATCH_WORKER_PERIOD_SECONDS = 5;
+
+    @Autowired
+    private ITaskExecutorClient taskExecutorClient;
+
+    @Getter
+    private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher> 
dispatchWorkerMap;
+
+    public WorkerGroupTaskDispatcherManager() {
+        dispatchWorkerMap = new ConcurrentHashMap<>();
+        
MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleAtFixedRate(
+                this::checkDeleteDispatchWorkerComplete, 0,
+                CHECK_DELETE_DISPATCH_WORKER_PERIOD_SECONDS, TimeUnit.SECONDS);
+    }
+
+    @PostConstruct
+    public void init() {
+        this.addWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
+    }
+
+    /**
+     * Adds a task to the specified worker group queue and starts or wakes up 
the corresponding processing loop.
+     *
+     * @param workerGroup the identifier for the worker group, used to 
distinguish different task queues
+     * @param taskExecutionRunnable an instance of ITaskExecutionRunnable 
representing the task to be executed
+     * @param delayTimeMills the delay time before the task is executed, in 
milliseconds
+     */
+    public Boolean add(String workerGroup, ITaskExecutionRunnable 
taskExecutionRunnable, long delayTimeMills) {
+        WorkerGroupTaskDispatcher workerGroupTaskDispatcher = 
dispatchWorkerMap.get(workerGroup);
+        if (workerGroupTaskDispatcher != null) {
+            workerGroupTaskDispatcher.add(taskExecutionRunnable, 
delayTimeMills);
+            log.info("queue size {}", workerGroupTaskDispatcher.size());

Review Comment:
   ```suggestion
               
   ```



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java:
##########
@@ -31,24 +32,25 @@
 
 /**
  * The class is used to store {@link ITaskExecutionRunnable} which needs to be 
dispatched. The {@link ITaskExecutionRunnable}
- * will be stored in {@link PriorityDelayQueue}, if the {@link 
ITaskExecutionRunnable}'s delay time is 0, then it will be
+ * will be stored in {@link DelayQueue}, if the {@link 
ITaskExecutionRunnable}'s delay time is 0, then it will be
  * consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
  * <p>
- * The order of {@link ITaskExecutionRunnable} in the {@link 
PriorityDelayQueue} is determined by {@link ITaskExecutionRunnable#compareTo}.
+ * The order of {@link ITaskExecutionRunnable} in the {@link DelayQueue} is 
determined by {@link ITaskExecutionRunnable#compareTo}.
  */
 @Slf4j
 @Component
 public class GlobalTaskDispatchWaitingQueue {
 
     private final Set<Integer> waitingTaskInstanceIds = 
ConcurrentHashMap.newKeySet();
-    private final PriorityDelayQueue<DelayEntry<ITaskExecutionRunnable>> 
priorityDelayQueue =
-            new PriorityDelayQueue<>();
+
+    private final DelayQueue<TimeBasedTaskExecutionRunnableComparableEntry> 
delayQueue =
+            new DelayQueue<>();
 
     /**
      * Submit a {@link ITaskExecutionRunnable} with delay time 0, it will be 
consumed immediately.
      */
-    public synchronized void 
dispatchTaskExecuteRunnable(ITaskExecutionRunnable ITaskExecutionRunnable) {
-        dispatchTaskExecuteRunnableWithDelay(ITaskExecutionRunnable, 0);
+    public synchronized void 
dispatchTaskExecuteRunnable(ITaskExecutionRunnable iTaskExecutionRunnable) {
+        dispatchTaskExecuteRunnableWithDelay(iTaskExecutionRunnable, 0);

Review Comment:
   Why the delay time is 0 here, if so the `delayQueue` is meaningless.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
+import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * WorkerGroupTaskDispatcher is responsible for dispatching tasks from the 
task queue.
+ * The main responsibilities include:
+ * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for 
dispatch.
+ * 2. Re-queuing tasks that fail to dispatch according to retry logic.
+ * 3. Ensuring thread safety and correct state transitions during task 
processing.
+ */
+@Slf4j
+public class WorkerGroupTaskDispatcher extends BaseDaemonThread implements 
AutoCloseable {
+
+    private final ITaskExecutorClient taskExecutorClient;
+
+    // todo: The current queue is flawed. When a high-priority task fails,
+    // it will be delayed and will not return to the first or second position.
+    // Tasks with the same priority will preempt its position.
+    // If it needs to be placed at the front of the queue, the queue needs to 
be re-implemented.
+    private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> 
workerGroupQueue;
+
+    private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
+
+    @Getter
+    private DispatchWorkerStatus status;
+
+    public WorkerGroupTaskDispatcher(String workerGroupName, 
ITaskExecutorClient taskExecutorClient) {
+        super("WorkerGroupTaskDispatcher-" + workerGroupName);
+        this.taskExecutorClient = taskExecutorClient;
+        this.workerGroupQueue = new PriorityDelayQueue<>();
+        status = DispatchWorkerStatus.STARTED;
+    }
+
+    public void add(ITaskExecutionRunnable taskExecutionRunnable, long 
delayTimeMills) {
+        workerGroupQueue.add(new 
PriorityAndDelayBasedTaskEntry(delayTimeMills, taskExecutionRunnable));
+    }
+
+    public int size() {
+        return workerGroupQueue.size();
+    }
+
+    @Override
+    public synchronized void start() {
+        if (!RUNNING_FLAG.compareAndSet(false, true)) {
+            log.error("The {} already started, will not start again", 
this.getName());
+            return;
+        }
+        log.info("{} starting...", this.getName());
+        super.start();
+        log.info("{} started...", this.getName());
+    }

Review Comment:
   If a dispatcher closed and then start again, can it start twice? this is a 
thread!



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
+import 
org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier;
+import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * WorkerGroupTaskDispatcherManager is responsible for managing the task 
dispatching for worker groups.
+ * It maintains a mapping of worker groups to their task dispatchers and 
priority delay queues,
+ * and supports adding tasks, starting and stopping worker groups, as well as 
cleaning up resources upon shutdown.
+ */
+@Component
+@Slf4j
+public class WorkerGroupTaskDispatcherManager implements AutoCloseable, 
WorkerGroupChangeNotifier.WorkerGroupListener {
+
+    private static final long CHECK_DELETE_DISPATCH_WORKER_PERIOD_SECONDS = 5;
+
+    @Autowired
+    private ITaskExecutorClient taskExecutorClient;
+
+    @Getter
+    private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher> 
dispatchWorkerMap;
+
+    public WorkerGroupTaskDispatcherManager() {
+        dispatchWorkerMap = new ConcurrentHashMap<>();
+        
MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleAtFixedRate(
+                this::checkDeleteDispatchWorkerComplete, 0,
+                CHECK_DELETE_DISPATCH_WORKER_PERIOD_SECONDS, TimeUnit.SECONDS);
+    }
+
+    @PostConstruct
+    public void init() {
+        this.addWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
+    }
+
+    /**
+     * Adds a task to the specified worker group queue and starts or wakes up 
the corresponding processing loop.
+     *
+     * @param workerGroup the identifier for the worker group, used to 
distinguish different task queues
+     * @param taskExecutionRunnable an instance of ITaskExecutionRunnable 
representing the task to be executed
+     * @param delayTimeMills the delay time before the task is executed, in 
milliseconds
+     */
+    public Boolean add(String workerGroup, ITaskExecutionRunnable 
taskExecutionRunnable, long delayTimeMills) {
+        WorkerGroupTaskDispatcher workerGroupTaskDispatcher = 
dispatchWorkerMap.get(workerGroup);
+        if (workerGroupTaskDispatcher != null) {
+            workerGroupTaskDispatcher.add(taskExecutionRunnable, 
delayTimeMills);
+            log.info("queue size {}", workerGroupTaskDispatcher.size());
+            return true;
+        } else {
+            log.error("workerGroupTaskDispatcher {} not found", workerGroup);
+        }
+        return false;

Review Comment:
   If the WorkerGroupTaskDispatcher doens't exist, then the task lifecycle will 
break? I don't see anywhere deal with the result.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
+import 
org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier;
+import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * WorkerGroupTaskDispatcherManager is responsible for managing the task 
dispatching for worker groups.
+ * It maintains a mapping of worker groups to their task dispatchers and 
priority delay queues,
+ * and supports adding tasks, starting and stopping worker groups, as well as 
cleaning up resources upon shutdown.
+ */
+@Component
+@Slf4j
+public class WorkerGroupTaskDispatcherManager implements AutoCloseable, 
WorkerGroupChangeNotifier.WorkerGroupListener {
+
+    private static final long CHECK_DELETE_DISPATCH_WORKER_PERIOD_SECONDS = 5;
+
+    @Autowired
+    private ITaskExecutorClient taskExecutorClient;
+
+    @Getter
+    private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher> 
dispatchWorkerMap;
+
+    public WorkerGroupTaskDispatcherManager() {
+        dispatchWorkerMap = new ConcurrentHashMap<>();
+        
MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleAtFixedRate(
+                this::checkDeleteDispatchWorkerComplete, 0,
+                CHECK_DELETE_DISPATCH_WORKER_PERIOD_SECONDS, TimeUnit.SECONDS);
+    }
+
+    @PostConstruct
+    public void init() {
+        this.addWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
+    }
+
+    /**
+     * Adds a task to the specified worker group queue and starts or wakes up 
the corresponding processing loop.
+     *
+     * @param workerGroup the identifier for the worker group, used to 
distinguish different task queues
+     * @param taskExecutionRunnable an instance of ITaskExecutionRunnable 
representing the task to be executed
+     * @param delayTimeMills the delay time before the task is executed, in 
milliseconds
+     */
+    public Boolean add(String workerGroup, ITaskExecutionRunnable 
taskExecutionRunnable, long delayTimeMills) {
+        WorkerGroupTaskDispatcher workerGroupTaskDispatcher = 
dispatchWorkerMap.get(workerGroup);
+        if (workerGroupTaskDispatcher != null) {
+            workerGroupTaskDispatcher.add(taskExecutionRunnable, 
delayTimeMills);
+            log.info("queue size {}", workerGroupTaskDispatcher.size());
+            return true;
+        } else {
+            log.error("workerGroupTaskDispatcher {} not found", workerGroup);
+        }
+        return false;
+    }
+
+    /**
+     * Stops a specific worker group's task dispatch waiting queue looper.
+     *
+     * @param workerGroup the identifier for the worker group
+     */
+    public synchronized void deleteWorkerGroup(String workerGroup) throws 
Exception {
+        WorkerGroupTaskDispatcher workerGroupTaskDispatcher = 
dispatchWorkerMap.get(workerGroup);
+        if (workerGroupTaskDispatcher != null) {
+            workerGroupTaskDispatcher.close();
+        } else {
+            log.warn("workerGroupTaskDispatcher {} not found", workerGroup);
+        }
+    }
+
+    /**
+     * add workerGroup
+     *
+     * @param workerGroup the identifier for the worker group
+     */
+    public synchronized void addWorkerGroup(String workerGroup) {
+        WorkerGroupTaskDispatcher looper =
+                dispatchWorkerMap.computeIfAbsent(workerGroup,
+                        k -> new WorkerGroupTaskDispatcher(workerGroup, 
taskExecutorClient));
+        looper.start();
+    }
+
+    /**
+     * Stop all workerGroupTaskDispatchWaitingQueueLooper
+     */
+    @Override
+    public void close() throws Exception {
+        log.info("WorkerGroupTaskDispatcherManager start close");
+        for (Map.Entry<String, WorkerGroupTaskDispatcher> entry : 
dispatchWorkerMap.entrySet()) {
+            try {
+                entry.getValue().close();
+            } catch (Exception e) {
+                log.error("stop worker group error", e);
+            }
+        }
+        log.info("WorkerGroupTaskDispatcherManager closed");
+    }
+
+    @Override
+    public void onWorkerGroupAdd(List<WorkerGroup> workerGroups) {
+        for (WorkerGroup workerGroup : workerGroups) {
+            this.addWorkerGroup(workerGroup.getName());
+        }
+    }
+
+    @Override
+    public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
+        String workerGroupsString = workerGroups.stream()
+                .map(WorkerGroup::getName)
+                .collect(Collectors.joining(", "));
+        log.info("Worker groups: {}", workerGroupsString);
+    }
+
+    @Override
+    public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
+        for (WorkerGroup workerGroup : workerGroups) {
+            try {
+                this.deleteWorkerGroup(workerGroup.getName());
+            } catch (Exception e) {
+                log.error("stop worker group error", e);
+            }
+        }
+    }
+
+    private void checkDeleteDispatchWorkerComplete() {
+        for (Map.Entry<String, WorkerGroupTaskDispatcher> entry : 
dispatchWorkerMap.entrySet()) {
+            String workerGroup = entry.getKey();
+            WorkerGroupTaskDispatcher workerGroupTaskDispatcher = 
entry.getValue();
+            switch (workerGroupTaskDispatcher.getStatus()) {
+                case CLOSING:
+                    try (WorkerGroupTaskDispatcher ignored = 
workerGroupTaskDispatcher) {
+                        log.info("try to delete worker group {}", workerGroup);
+                    } catch (Exception e) {
+                        log.error("stop worker group error", e);
+                    }
+                    break;
+                case CLOSED:
+                    try (WorkerGroupTaskDispatcher ignored = 
dispatchWorkerMap.remove(workerGroup)) {
+                        log.info("success remove worker group {}", 
workerGroup);
+                    } catch (Exception e) {
+                        log.error("stop worker group error", e);
+                    }
+                    break;
+                default:
+                    log.debug("worker group {} status {}", workerGroup, 
workerGroupTaskDispatcher.getStatus());
+                    break;
+            }
+        }
+    }

Review Comment:
   Once a dispatcher's status is closed, then it might still exist task in its 
queue, I don't see any control about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to