ruanwenjun commented on code in PR #17037:
URL:
https://github.com/apache/dolphinscheduler/pull/17037#discussion_r2024919649
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java:
##########
@@ -65,24 +66,20 @@ public void run() {
}
void doDispatch() {
- final ITaskExecutionRunnable taskExecutionRunnable =
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
- final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
- try {
- final TaskExecutionStatus status = taskInstance.getState();
- if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status !=
TaskExecutionStatus.DELAY_EXECUTION) {
Review Comment:
Why you remove this? Are you clear on what this code does? After your
removed, the task instance cannot be removed after killed.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java:
##########
@@ -65,24 +66,20 @@ public void run() {
}
void doDispatch() {
- final ITaskExecutionRunnable taskExecutionRunnable =
globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
- final TaskInstance taskInstance =
taskExecutionRunnable.getTaskInstance();
- try {
- final TaskExecutionStatus status = taskInstance.getState();
- if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status !=
TaskExecutionStatus.DELAY_EXECUTION) {
- log.warn("The TaskInstance {} state is : {}, will not
dispatch", taskInstance.getName(), status);
- return;
- }
- taskExecutorClient.dispatch(taskExecutionRunnable);
- } catch (Exception e) {
+ ITaskExecutionRunnable taskExecutionRunnable =
+ globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
+ boolean addTaskSuccess =
workerGroupTaskDispatcherManager.addTaskToWorkerGroup(
+ taskExecutionRunnable.getTaskInstance().getWorkerGroup(),
+ taskExecutionRunnable, 0);
+ if (!addTaskSuccess) {
+ log.warn("worker group is deleting or deleted, taskInstance: {}",
taskExecutionRunnable.getTaskInstance());
Review Comment:
```suggestion
log.warn("Dispatch TaskInstance: {} WorkerGrouTaskDispatcher: {}
failed", taskExecutionRunnable.getTaskInstance().getName(),
taskExecutionRunnable.getTaskInstance().getWorkerGroup());
```
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java:
##########
@@ -91,6 +88,7 @@ public void close() throws Exception {
if (RUNNING_FLAG.compareAndSet(true, false)) {
log.info("GlobalTaskDispatchWaitingQueueLooper stopping...");
log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");
+ workerGroupTaskDispatcherManager.close();
Review Comment:
Should before `log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");`
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
+import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * WorkerGroupTaskDispatcher is responsible for dispatching tasks from the
task queue.
+ * The main responsibilities include:
+ * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for
dispatch.
+ * 2. Re-queuing tasks that fail to dispatch according to retry logic.
+ * 3. Ensuring thread safety and correct state transitions during task
processing.
+ */
+@Slf4j
+public class WorkerGroupTaskDispatcher extends BaseDaemonThread {
+
+ private final ITaskExecutorClient taskExecutorClient;
+
+ // todo: The current queue is flawed. When a high-priority task fails,
+ // it will be delayed and will not return to the first or second position.
+ // Tasks with the same priority will preempt its position.
+ // If it needs to be placed at the front of the queue, the queue needs to
be re-implemented.
+ private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry>
workerGroupQueue;
+
+ private final AtomicReference<DispatchWorkerStatus> status = new
AtomicReference<>(DispatchWorkerStatus.INIT);
+
+ public WorkerGroupTaskDispatcher(String workerGroupName,
ITaskExecutorClient taskExecutorClient) {
+ super("WorkerGroupTaskDispatcher-" + workerGroupName);
+ this.taskExecutorClient = taskExecutorClient;
+ this.workerGroupQueue = new PriorityDelayQueue<>();
+ }
+
+ /**
+ * Adds a task to the worker group queue.
+ *
+ * This method wraps the given task execution object into a priority and
delay-based task entry and adds it to the worker group queue.
+ * The task is only added if the current dispatcher status is either
STARTED or INIT. If the dispatcher is in any other state,
+ * the task addition will fail, and a warning message will be logged.
+ *
+ * @param taskExecutionRunnable The task execution object to add to the
queue, which implements the {@link ITaskExecutionRunnable} interface.
+ * @param delayTimeMills The delay time in milliseconds before the task
should be executed.
+ * @return true if the task was successfully added to the queue, false
otherwise.
+ */
+ public boolean addTaskToWorkerGroupQueue(ITaskExecutionRunnable
taskExecutionRunnable,
+ long delayTimeMills) {
+ if (status.get() == DispatchWorkerStatus.STARTED || status.get() ==
DispatchWorkerStatus.INIT) {
+ workerGroupQueue.add(new
PriorityAndDelayBasedTaskEntry(delayTimeMills, taskExecutionRunnable));
+ return true;
+ } else {
+ log.warn("The {} status is {}, task can not add Queue, it will
fail", this.getName(), status.get());
Review Comment:
```suggestion
log.warn("The WorkerGroupTaskDispatcher: {} status is {}, cannot
receive task: {}", this.getName(), status.get(),
taskExecutionRunnable.getTaskInstance().getName());
```
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcher.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
+import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * WorkerGroupTaskDispatcher is responsible for dispatching tasks from the
task queue.
+ * The main responsibilities include:
+ * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for
dispatch.
+ * 2. Re-queuing tasks that fail to dispatch according to retry logic.
+ * 3. Ensuring thread safety and correct state transitions during task
processing.
+ */
+@Slf4j
+public class WorkerGroupTaskDispatcher extends BaseDaemonThread {
+
+ private final ITaskExecutorClient taskExecutorClient;
+
+ // todo: The current queue is flawed. When a high-priority task fails,
+ // it will be delayed and will not return to the first or second position.
+ // Tasks with the same priority will preempt its position.
+ // If it needs to be placed at the front of the queue, the queue needs to
be re-implemented.
+ private final PriorityDelayQueue<PriorityAndDelayBasedTaskEntry>
workerGroupQueue;
+
+ private final AtomicReference<DispatchWorkerStatus> status = new
AtomicReference<>(DispatchWorkerStatus.INIT);
+
+ public WorkerGroupTaskDispatcher(String workerGroupName,
ITaskExecutorClient taskExecutorClient) {
+ super("WorkerGroupTaskDispatcher-" + workerGroupName);
+ this.taskExecutorClient = taskExecutorClient;
+ this.workerGroupQueue = new PriorityDelayQueue<>();
+ }
+
+ /**
+ * Adds a task to the worker group queue.
+ *
+ * This method wraps the given task execution object into a priority and
delay-based task entry and adds it to the worker group queue.
+ * The task is only added if the current dispatcher status is either
STARTED or INIT. If the dispatcher is in any other state,
+ * the task addition will fail, and a warning message will be logged.
+ *
+ * @param taskExecutionRunnable The task execution object to add to the
queue, which implements the {@link ITaskExecutionRunnable} interface.
+ * @param delayTimeMills The delay time in milliseconds before the task
should be executed.
+ * @return true if the task was successfully added to the queue, false
otherwise.
+ */
+ public boolean addTaskToWorkerGroupQueue(ITaskExecutionRunnable
taskExecutionRunnable,
+ long delayTimeMills) {
+ if (status.get() == DispatchWorkerStatus.STARTED || status.get() ==
DispatchWorkerStatus.INIT) {
+ workerGroupQueue.add(new
PriorityAndDelayBasedTaskEntry(delayTimeMills, taskExecutionRunnable));
+ return true;
+ } else {
+ log.warn("The {} status is {}, task can not add Queue, it will
fail", this.getName(), status.get());
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized void start() {
+ if (status.compareAndSet(DispatchWorkerStatus.INIT,
DispatchWorkerStatus.STARTED)) {
+ log.info("The {} starting...", this.getName());
+ super.start();
+ log.info("The {} started", this.getName());
+ } else {
+ log.error("The {} status is {}, will not start again",
this.getName(), status.get());
+ }
+ }
+
+ public void markDispatcherClosing() {
+ if (status.get() != DispatchWorkerStatus.CLOSED) {
+ status.set(DispatchWorkerStatus.CLOSING);
+ } else {
+ log.warn("The {} is Closed, will not markDispatcherClosing again",
this.getName());
+ }
+ }
+
+ public void markDispatcherStart() {
+ if (status.compareAndSet(DispatchWorkerStatus.CLOSING,
DispatchWorkerStatus.STARTED)) {
+ log.info("The {} markDispatcherStart...", this.getName());
+ } else {
+ log.warn("The {} status is {}, will not markDispatcherStart",
this.getName(), status.get());
+ }
+ }
Review Comment:
If the thread has already died, this method cannot restart the dispatcher
again.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.queue;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Getter;
+
+import org.jetbrains.annotations.NotNull;
+
+public class TimeBasedTaskExecutionRunnableComparableEntry<V> implements
Delayed {
Review Comment:
Since this component is used for `TaskExecutionRunnable` please don't use
generic type here or you should use `<V extends ITaskExecutionRunnbale>`
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
+import
org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier;
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * WorkerGroupTaskDispatcherManager is responsible for managing the task
dispatching for worker groups.
+ * It maintains a mapping of worker groups to their task dispatchers and
priority delay queues,
+ * and supports adding tasks, starting and stopping worker groups, as well as
cleaning up resources upon shutdown.
+ */
+@Component
+@Slf4j
+public class WorkerGroupTaskDispatcherManager implements AutoCloseable,
WorkerGroupChangeNotifier.WorkerGroupListener {
+
+ @Autowired
+ private ITaskExecutorClient taskExecutorClient;
+
+ @Getter
+ private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher>
dispatchWorkerMap;
+
+ public WorkerGroupTaskDispatcherManager() {
+ dispatchWorkerMap = new ConcurrentHashMap<>();
+ }
+
+ @PostConstruct
+ public void init() {
+ this.addWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
+ }
+
+ /**
+ * Adds a task to the specified worker group queue and starts or wakes up
the corresponding processing loop.
+ *
+ * @param workerGroup the identifier for the worker group, used to
distinguish different task queues
+ * @param taskExecutionRunnable an instance of ITaskExecutionRunnable
representing the task to be executed
+ * @param delayTimeMills the delay time before the task is executed, in
milliseconds
+ */
+ public synchronized boolean addTaskToWorkerGroup(String workerGroup,
ITaskExecutionRunnable taskExecutionRunnable,
+ long delayTimeMills) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher != null) {
+ if (workerGroupTaskDispatcher.checkCloseDispatchWorkerComplete()) {
+ dispatchWorkerMap.remove(workerGroup);
+ return false;
+ }
+ return
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable,
delayTimeMills);
+ } else {
+ log.error("workerGroupTaskDispatcher {} not found, will set task
{} fail",
+ workerGroup,
taskExecutionRunnable.getTaskInstance().getId());
+ }
+ return false;
+ }
+
+ /**
+ * Stops a specific worker group's task dispatch waiting queue looper.
+ *
+ * @param workerGroup the identifier for the worker group
+ */
+ private synchronized void deleteWorkerGroup(String workerGroup) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher != null) {
+ workerGroupTaskDispatcher.markDispatcherClosing();
+ } else {
+ log.warn("workerGroupTaskDispatcher {} not found", workerGroup);
+ }
+ }
+
+ /**
+ * add workerGroup
+ *
+ * @param workerGroup the identifier for the worker group
+ */
+ private synchronized void addWorkerGroup(String workerGroup) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher == null) {
+ workerGroupTaskDispatcher = new
WorkerGroupTaskDispatcher(workerGroup, taskExecutorClient);
+ dispatchWorkerMap.put(workerGroup, workerGroupTaskDispatcher);
+ workerGroupTaskDispatcher.start();
+ } else {
+ workerGroupTaskDispatcher.markDispatcherStart();
Review Comment:
There will exist concurrency problem, if the dispatcher just close the loop.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
+import
org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier;
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * WorkerGroupTaskDispatcherManager is responsible for managing the task
dispatching for worker groups.
+ * It maintains a mapping of worker groups to their task dispatchers and
priority delay queues,
+ * and supports adding tasks, starting and stopping worker groups, as well as
cleaning up resources upon shutdown.
+ */
+@Component
+@Slf4j
+public class WorkerGroupTaskDispatcherManager implements AutoCloseable,
WorkerGroupChangeNotifier.WorkerGroupListener {
+
+ @Autowired
+ private ITaskExecutorClient taskExecutorClient;
+
+ @Getter
+ private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher>
dispatchWorkerMap;
+
+ public WorkerGroupTaskDispatcherManager() {
+ dispatchWorkerMap = new ConcurrentHashMap<>();
+ }
+
+ @PostConstruct
+ public void init() {
+ this.addWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
+ }
+
+ /**
+ * Adds a task to the specified worker group queue and starts or wakes up
the corresponding processing loop.
+ *
+ * @param workerGroup the identifier for the worker group, used to
distinguish different task queues
+ * @param taskExecutionRunnable an instance of ITaskExecutionRunnable
representing the task to be executed
+ * @param delayTimeMills the delay time before the task is executed, in
milliseconds
+ */
+ public synchronized boolean addTaskToWorkerGroup(String workerGroup,
ITaskExecutionRunnable taskExecutionRunnable,
+ long delayTimeMills) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher != null) {
+ if (workerGroupTaskDispatcher.checkCloseDispatchWorkerComplete()) {
+ dispatchWorkerMap.remove(workerGroup);
+ return false;
+ }
+ return
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable,
delayTimeMills);
+ } else {
+ log.error("workerGroupTaskDispatcher {} not found, will set task
{} fail",
+ workerGroup,
taskExecutionRunnable.getTaskInstance().getId());
+ }
+ return false;
+ }
+
+ /**
+ * Stops a specific worker group's task dispatch waiting queue looper.
+ *
+ * @param workerGroup the identifier for the worker group
+ */
+ private synchronized void deleteWorkerGroup(String workerGroup) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher != null) {
+ workerGroupTaskDispatcher.markDispatcherClosing();
+ } else {
+ log.warn("workerGroupTaskDispatcher {} not found", workerGroup);
+ }
+ }
+
+ /**
+ * add workerGroup
+ *
+ * @param workerGroup the identifier for the worker group
+ */
+ private synchronized void addWorkerGroup(String workerGroup) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher == null) {
+ workerGroupTaskDispatcher = new
WorkerGroupTaskDispatcher(workerGroup, taskExecutorClient);
+ dispatchWorkerMap.put(workerGroup, workerGroupTaskDispatcher);
+ workerGroupTaskDispatcher.start();
+ } else {
+ workerGroupTaskDispatcher.markDispatcherStart();
+ }
+ }
+
+ /**
+ * Stop all workerGroupTaskDispatchWaitingQueueLooper
+ */
+ @Override
+ public void close() throws Exception {
+ log.info("WorkerGroupTaskDispatcherManager start close");
+ for (Map.Entry<String, WorkerGroupTaskDispatcher> entry :
dispatchWorkerMap.entrySet()) {
+ try {
+ entry.getValue().markDispatcherClosing();
+ } catch (Exception e) {
+ log.error("stop worker group error", e);
+ }
+ }
+ log.info("WorkerGroupTaskDispatcherManager closed");
+ }
+
+ @Override
+ public void onWorkerGroupAdd(List<WorkerGroup> workerGroups) {
+ this.checkAndRemoveClosedDispatchWorker();
+ for (WorkerGroup workerGroup : workerGroups) {
+ this.addWorkerGroup(workerGroup.getName());
+ }
+ }
+
+ @Override
+ public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
+ this.checkAndRemoveClosedDispatchWorker();
+ String workerGroupsString = workerGroups.stream()
+ .map(WorkerGroup::getName)
+ .collect(Collectors.joining(", "));
+ log.info("Worker groups: {}", workerGroupsString);
+ }
Review Comment:
The logic is too strange, and the log is meaningless, no one can unstandard
what the log means if he doesn't familiar with the code.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
+import
org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier;
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * WorkerGroupTaskDispatcherManager is responsible for managing the task
dispatching for worker groups.
+ * It maintains a mapping of worker groups to their task dispatchers and
priority delay queues,
+ * and supports adding tasks, starting and stopping worker groups, as well as
cleaning up resources upon shutdown.
+ */
+@Component
+@Slf4j
+public class WorkerGroupTaskDispatcherManager implements AutoCloseable,
WorkerGroupChangeNotifier.WorkerGroupListener {
+
+ @Autowired
+ private ITaskExecutorClient taskExecutorClient;
+
+ @Getter
+ private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher>
dispatchWorkerMap;
+
+ public WorkerGroupTaskDispatcherManager() {
+ dispatchWorkerMap = new ConcurrentHashMap<>();
+ }
+
+ @PostConstruct
+ public void init() {
+ this.addWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
+ }
+
+ /**
+ * Adds a task to the specified worker group queue and starts or wakes up
the corresponding processing loop.
+ *
+ * @param workerGroup the identifier for the worker group, used to
distinguish different task queues
+ * @param taskExecutionRunnable an instance of ITaskExecutionRunnable
representing the task to be executed
+ * @param delayTimeMills the delay time before the task is executed, in
milliseconds
+ */
+ public synchronized boolean addTaskToWorkerGroup(String workerGroup,
ITaskExecutionRunnable taskExecutionRunnable,
+ long delayTimeMills) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher != null) {
+ if (workerGroupTaskDispatcher.checkCloseDispatchWorkerComplete()) {
+ dispatchWorkerMap.remove(workerGroup);
+ return false;
+ }
+ return
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable,
delayTimeMills);
+ } else {
+ log.error("workerGroupTaskDispatcher {} not found, will set task
{} fail",
+ workerGroup,
taskExecutionRunnable.getTaskInstance().getId());
+ }
+ return false;
+ }
+
+ /**
+ * Stops a specific worker group's task dispatch waiting queue looper.
+ *
+ * @param workerGroup the identifier for the worker group
+ */
+ private synchronized void deleteWorkerGroup(String workerGroup) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher != null) {
+ workerGroupTaskDispatcher.markDispatcherClosing();
+ } else {
+ log.warn("workerGroupTaskDispatcher {} not found", workerGroup);
+ }
+ }
+
+ /**
+ * add workerGroup
+ *
+ * @param workerGroup the identifier for the worker group
+ */
+ private synchronized void addWorkerGroup(String workerGroup) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher == null) {
+ workerGroupTaskDispatcher = new
WorkerGroupTaskDispatcher(workerGroup, taskExecutorClient);
+ dispatchWorkerMap.put(workerGroup, workerGroupTaskDispatcher);
+ workerGroupTaskDispatcher.start();
+ } else {
+ workerGroupTaskDispatcher.markDispatcherStart();
+ }
+ }
+
+ /**
+ * Stop all workerGroupTaskDispatchWaitingQueueLooper
+ */
+ @Override
+ public void close() throws Exception {
+ log.info("WorkerGroupTaskDispatcherManager start close");
+ for (Map.Entry<String, WorkerGroupTaskDispatcher> entry :
dispatchWorkerMap.entrySet()) {
+ try {
+ entry.getValue().markDispatcherClosing();
+ } catch (Exception e) {
+ log.error("stop worker group error", e);
+ }
+ }
+ log.info("WorkerGroupTaskDispatcherManager closed");
+ }
+
+ @Override
+ public void onWorkerGroupAdd(List<WorkerGroup> workerGroups) {
+ this.checkAndRemoveClosedDispatchWorker();
+ for (WorkerGroup workerGroup : workerGroups) {
+ this.addWorkerGroup(workerGroup.getName());
+ }
+ }
+
+ @Override
+ public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
+ this.checkAndRemoveClosedDispatchWorker();
+ String workerGroupsString = workerGroups.stream()
+ .map(WorkerGroup::getName)
+ .collect(Collectors.joining(", "));
+ log.info("Worker groups: {}", workerGroupsString);
+ }
+
+ @Override
+ public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
+ for (WorkerGroup workerGroup : workerGroups) {
+ try {
+ this.deleteWorkerGroup(workerGroup.getName());
+ } catch (Exception e) {
+ log.error("stop worker group error", e);
Review Comment:
```suggestion
log.error("Delete worker group: {} from
WorkerGroupTaskDispatcherManager error", workerGroup.getName(), e);
```
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkerGroupTaskDispatcherManager.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
+import
org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier;
+import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * WorkerGroupTaskDispatcherManager is responsible for managing the task
dispatching for worker groups.
+ * It maintains a mapping of worker groups to their task dispatchers and
priority delay queues,
+ * and supports adding tasks, starting and stopping worker groups, as well as
cleaning up resources upon shutdown.
+ */
+@Component
+@Slf4j
+public class WorkerGroupTaskDispatcherManager implements AutoCloseable,
WorkerGroupChangeNotifier.WorkerGroupListener {
+
+ @Autowired
+ private ITaskExecutorClient taskExecutorClient;
+
+ @Getter
+ private final ConcurrentHashMap<String, WorkerGroupTaskDispatcher>
dispatchWorkerMap;
+
+ public WorkerGroupTaskDispatcherManager() {
+ dispatchWorkerMap = new ConcurrentHashMap<>();
+ }
+
+ @PostConstruct
+ public void init() {
+ this.addWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
+ }
+
+ /**
+ * Adds a task to the specified worker group queue and starts or wakes up
the corresponding processing loop.
+ *
+ * @param workerGroup the identifier for the worker group, used to
distinguish different task queues
+ * @param taskExecutionRunnable an instance of ITaskExecutionRunnable
representing the task to be executed
+ * @param delayTimeMills the delay time before the task is executed, in
milliseconds
+ */
+ public synchronized boolean addTaskToWorkerGroup(String workerGroup,
ITaskExecutionRunnable taskExecutionRunnable,
+ long delayTimeMills) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher != null) {
+ if (workerGroupTaskDispatcher.checkCloseDispatchWorkerComplete()) {
+ dispatchWorkerMap.remove(workerGroup);
+ return false;
+ }
+ return
workerGroupTaskDispatcher.addTaskToWorkerGroupQueue(taskExecutionRunnable,
delayTimeMills);
+ } else {
+ log.error("workerGroupTaskDispatcher {} not found, will set task
{} fail",
+ workerGroup,
taskExecutionRunnable.getTaskInstance().getId());
+ }
+ return false;
+ }
+
+ /**
+ * Stops a specific worker group's task dispatch waiting queue looper.
+ *
+ * @param workerGroup the identifier for the worker group
+ */
+ private synchronized void deleteWorkerGroup(String workerGroup) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher != null) {
+ workerGroupTaskDispatcher.markDispatcherClosing();
+ } else {
+ log.warn("workerGroupTaskDispatcher {} not found", workerGroup);
+ }
+ }
+
+ /**
+ * add workerGroup
+ *
+ * @param workerGroup the identifier for the worker group
+ */
+ private synchronized void addWorkerGroup(String workerGroup) {
+ WorkerGroupTaskDispatcher workerGroupTaskDispatcher =
dispatchWorkerMap.get(workerGroup);
+ if (workerGroupTaskDispatcher == null) {
+ workerGroupTaskDispatcher = new
WorkerGroupTaskDispatcher(workerGroup, taskExecutorClient);
+ dispatchWorkerMap.put(workerGroup, workerGroupTaskDispatcher);
+ workerGroupTaskDispatcher.start();
+ } else {
+ workerGroupTaskDispatcher.markDispatcherStart();
+ }
+ }
+
+ /**
+ * Stop all workerGroupTaskDispatchWaitingQueueLooper
+ */
+ @Override
+ public void close() throws Exception {
+ log.info("WorkerGroupTaskDispatcherManager start close");
+ for (Map.Entry<String, WorkerGroupTaskDispatcher> entry :
dispatchWorkerMap.entrySet()) {
+ try {
+ entry.getValue().markDispatcherClosing();
+ } catch (Exception e) {
+ log.error("stop worker group error", e);
+ }
+ }
+ log.info("WorkerGroupTaskDispatcherManager closed");
Review Comment:
In fact you should closed the dispatcher here not just closing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]