ruanwenjun commented on code in PR #17037: URL: https://github.com/apache/dolphinscheduler/pull/17037#discussion_r1998123165
########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DispatchWorkerStatus.java: ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +public enum DispatchWorkerStatus { + DEFAULT, + DELETE_SUCCESS, + DELETING Review Comment: ```suggestion STARTED, CLOSING, CLOSED, ``` ########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; +import org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier; +import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.annotation.PostConstruct; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * WorkerGroupTaskDispatcherManager is responsible for managing the task dispatching for worker groups. + * It maintains a mapping of worker groups to their task dispatchers and priority delay queues, + * and supports adding tasks, starting and stopping worker groups, as well as cleaning up resources upon shutdown. + */ +@Component +@Slf4j +public class WorkerGroupTaskDispatcherManager implements AutoCloseable, WorkerGroupChangeNotifier.WorkerGroupListener { + + private static final long CHECK_DELETE_DISPATCH_WORKER_PERIOD_SECONDS = 5; + + @Autowired + private ITaskExecutorClient taskExecutorClient; + + @Getter + private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher> dispatchWorkerMap; + + private final ScheduledExecutorService scheduler; Review Comment: You cannot control this threadpool, so don't maintain this. ```suggestion ``` ########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.queue; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +import org.jetbrains.annotations.NotNull; + +public class PriorityAndDelayBasedTaskEntry<V extends Comparable<V>> extends DelayEntry<V> { Review Comment: ```suggestion public class PriorityDelayTaskEntry<V extends Comparable<V>> extends DelayEntry<V> { ``` ########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry; +import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue; + +import java.util.concurrent.atomic.AtomicBoolean; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * WorkerGroupTaskDispatcher is responsible for dispatching tasks from the task queue. + * The main responsibilities include: + * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for dispatch. + * 2. Re-queuing tasks that fail to dispatch according to retry logic. + * 3. Ensuring thread safety and correct state transitions during task processing. + */ +@Slf4j +public class WorkerGroupTaskDispatcher extends BaseDaemonThread implements AutoCloseable { + + private final ITaskExecutorClient taskExecutorClient; + + // todo: The current queue is flawed. When a high-priority task fails, + // it will be delayed and will not return to the first or second position. + // Tasks with the same priority will preempt its position. + // If it needs to be placed at the front of the queue, the queue needs to be re-implemented. + private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> workerGroupQueue; + + private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false); + + @Getter + private DispatchWorkerStatus status; + + public WorkerGroupTaskDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) { + super("WorkerGroupTaskDispatcher-" + workerGroupName); + this.taskExecutorClient = taskExecutorClient; + this.workerGroupQueue = new PriorityDelayQueue<>(); + status = DispatchWorkerStatus.DEFAULT; + } + + public void add(ITaskExecutionRunnable taskExecutionRunnable, long delayTimeMills) { + workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry(delayTimeMills, taskExecutionRunnable)); + } + + public int size() { + return workerGroupQueue.size(); + } + + @Override + public synchronized void start() { + if (!RUNNING_FLAG.compareAndSet(false, true)) { + log.error("The {} already started, will not start again", this.getName()); + return; + } + log.info("{} starting...", this.getName()); + super.start(); + log.info("{} started...", this.getName()); + } + + @Override + public void close() throws Exception { Review Comment: You need to do concurrency control in this class. ########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java: ########## @@ -59,6 +62,10 @@ public void start() { log.info("ClusterManager started..."); } + public void registerWorkerGroupListener(WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager) { + this.workerGroupTaskDispatcherManager = workerGroupTaskDispatcherManager; Review Comment: ```suggestion this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerGroupTaskDispatcherManager); ``` ########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java: ########## @@ -48,6 +49,8 @@ public class ClusterManager { @Autowired private RegistryClient registryClient; + private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager; Review Comment: ```suggestion ``` ########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java: ########## @@ -48,6 +49,8 @@ public class ClusterManager { @Autowired private RegistryClient registryClient; + private WorkerGroupTaskDispatcherManager workerGroupTaskDispatcherManager; Review Comment: Use a list here or we can directly remove this line. ########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry; +import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue; + +import java.util.concurrent.atomic.AtomicBoolean; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * WorkerGroupTaskDispatcher is responsible for dispatching tasks from the task queue. + * The main responsibilities include: + * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for dispatch. + * 2. Re-queuing tasks that fail to dispatch according to retry logic. + * 3. Ensuring thread safety and correct state transitions during task processing. + */ +@Slf4j +public class WorkerGroupTaskDispatcher extends BaseDaemonThread implements AutoCloseable { + + private final ITaskExecutorClient taskExecutorClient; + + // todo: The current queue is flawed. When a high-priority task fails, + // it will be delayed and will not return to the first or second position. + // Tasks with the same priority will preempt its position. + // If it needs to be placed at the front of the queue, the queue needs to be re-implemented. + private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> workerGroupQueue; + + private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false); + + @Getter + private DispatchWorkerStatus status; + + public WorkerGroupTaskDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) { + super("WorkerGroupTaskDispatcher-" + workerGroupName); + this.taskExecutorClient = taskExecutorClient; + this.workerGroupQueue = new PriorityDelayQueue<>(); + status = DispatchWorkerStatus.DEFAULT; + } + + public void add(ITaskExecutionRunnable taskExecutionRunnable, long delayTimeMills) { + workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry(delayTimeMills, taskExecutionRunnable)); + } + + public int size() { + return workerGroupQueue.size(); + } + + @Override + public synchronized void start() { + if (!RUNNING_FLAG.compareAndSet(false, true)) { + log.error("The {} already started, will not start again", this.getName()); + return; + } + log.info("{} starting...", this.getName()); + super.start(); + log.info("{} started...", this.getName()); + } + + @Override + public void close() throws Exception { Review Comment: ```suggestion public void markDispatcherDeleting() throws Exception { ``` ########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry; +import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue; + +import java.util.concurrent.atomic.AtomicBoolean; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * WorkerGroupTaskDispatcher is responsible for dispatching tasks from the task queue. + * The main responsibilities include: + * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for dispatch. + * 2. Re-queuing tasks that fail to dispatch according to retry logic. + * 3. Ensuring thread safety and correct state transitions during task processing. + */ +@Slf4j +public class WorkerGroupTaskDispatcher extends BaseDaemonThread implements AutoCloseable { + + private final ITaskExecutorClient taskExecutorClient; + + // todo: The current queue is flawed. When a high-priority task fails, + // it will be delayed and will not return to the first or second position. + // Tasks with the same priority will preempt its position. + // If it needs to be placed at the front of the queue, the queue needs to be re-implemented. + private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> workerGroupQueue; + + private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false); + + @Getter + private DispatchWorkerStatus status; + + public WorkerGroupTaskDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) { + super("WorkerGroupTaskDispatcher-" + workerGroupName); + this.taskExecutorClient = taskExecutorClient; + this.workerGroupQueue = new PriorityDelayQueue<>(); + status = DispatchWorkerStatus.DEFAULT; + } + + public void add(ITaskExecutionRunnable taskExecutionRunnable, long delayTimeMills) { + workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry(delayTimeMills, taskExecutionRunnable)); + } + + public int size() { + return workerGroupQueue.size(); + } + + @Override + public synchronized void start() { + if (!RUNNING_FLAG.compareAndSet(false, true)) { + log.error("The {} already started, will not start again", this.getName()); + return; + } + log.info("{} starting...", this.getName()); + super.start(); + log.info("{} started...", this.getName()); + } + + @Override + public void close() throws Exception { + if (workerGroupQueue.size() == 0) { + status = DispatchWorkerStatus.DELETE_SUCCESS; + if (RUNNING_FLAG.compareAndSet(true, false)) { + log.info("{} stopping...", this.getName()); + log.info("{} stopped...", this.getName()); + } + } else { + log.warn("The {} queue is not empty, will not stop", this.getName()); + status = DispatchWorkerStatus.DELETING; + } + } + + @Override + public void run() { + while (RUNNING_FLAG.get()) { + this.dispatch(); + } + } + + public void dispatch() { Review Comment: ```suggestion private void dispatch() { ``` ########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry; +import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue; + +import java.util.concurrent.atomic.AtomicBoolean; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * WorkerGroupTaskDispatcher is responsible for dispatching tasks from the task queue. + * The main responsibilities include: + * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for dispatch. + * 2. Re-queuing tasks that fail to dispatch according to retry logic. + * 3. Ensuring thread safety and correct state transitions during task processing. + */ +@Slf4j +public class WorkerGroupTaskDispatcher extends BaseDaemonThread implements AutoCloseable { + + private final ITaskExecutorClient taskExecutorClient; + + // todo: The current queue is flawed. When a high-priority task fails, + // it will be delayed and will not return to the first or second position. + // Tasks with the same priority will preempt its position. + // If it needs to be placed at the front of the queue, the queue needs to be re-implemented. + private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> workerGroupQueue; + + private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false); + + @Getter + private DispatchWorkerStatus status; + + public WorkerGroupTaskDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) { + super("WorkerGroupTaskDispatcher-" + workerGroupName); + this.taskExecutorClient = taskExecutorClient; + this.workerGroupQueue = new PriorityDelayQueue<>(); + status = DispatchWorkerStatus.DEFAULT; + } + + public void add(ITaskExecutionRunnable taskExecutionRunnable, long delayTimeMills) { + workerGroupQueue.add(new PriorityAndDelayBasedTaskEntry(delayTimeMills, taskExecutionRunnable)); + } + + public int size() { + return workerGroupQueue.size(); + } + + @Override + public synchronized void start() { + if (!RUNNING_FLAG.compareAndSet(false, true)) { + log.error("The {} already started, will not start again", this.getName()); + return; + } + log.info("{} starting...", this.getName()); + super.start(); + log.info("{} started...", this.getName()); + } + + @Override + public void close() throws Exception { + if (workerGroupQueue.size() == 0) { + status = DispatchWorkerStatus.DELETE_SUCCESS; + if (RUNNING_FLAG.compareAndSet(true, false)) { + log.info("{} stopping...", this.getName()); + log.info("{} stopped...", this.getName()); + } + } else { + log.warn("The {} queue is not empty, will not stop", this.getName()); + status = DispatchWorkerStatus.DELETING; + } + } + + @Override + public void run() { + while (RUNNING_FLAG.get()) { + this.dispatch(); + } + } + + public void dispatch() { Review Comment: This method should not exposed to outside. ########## dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java: ########## @@ -31,24 +32,25 @@ /** * The class is used to store {@link ITaskExecutionRunnable} which needs to be dispatched. The {@link ITaskExecutionRunnable} - * will be stored in {@link PriorityDelayQueue}, if the {@link ITaskExecutionRunnable}'s delay time is 0, then it will be + * will be stored in {@link DelayQueue}, if the {@link ITaskExecutionRunnable}'s delay time is 0, then it will be * consumed by {@link GlobalTaskDispatchWaitingQueueLooper}. * <p> - * The order of {@link ITaskExecutionRunnable} in the {@link PriorityDelayQueue} is determined by {@link ITaskExecutionRunnable#compareTo}. + * The order of {@link ITaskExecutionRunnable} in the {@link DelayQueue} is determined by {@link ITaskExecutionRunnable#compareTo}. */ @Slf4j @Component public class GlobalTaskDispatchWaitingQueue { private final Set<Integer> waitingTaskInstanceIds = ConcurrentHashMap.newKeySet(); - private final PriorityDelayQueue<DelayEntry<ITaskExecutionRunnable>> priorityDelayQueue = - new PriorityDelayQueue<>(); + + private final PriorityBlockingQueue<TimeBasedTaskExecutionRunnableComparableEntry> delayQueue = Review Comment: We should use DelayQueue here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
