ruanwenjun commented on issue #16260:
URL: 
https://github.com/apache/dolphinscheduler/issues/16260#issuecomment-2639382034

   > 1.Add WorkerGroupQueueMap, which creates a DelayQueue based on the 
workergroup name. The queue is sorted by task priority and expiration time, 
with priority taking precedence over expiration time.
   > 
   > ```
   > public int compareTo(@NotNull Delayed o) {
   >     // First, compare priorities
   >     int priorityComparison = Integer.compare(this.priority, ((Task) 
other).priority);
   >     if (priorityComparison != 0) {
   >         return priorityComparison; // If priorities are different, return 
the comparison result directly
   >     }
   > 
   >     // If priorities are the same, compare expiration times
   >     return Long.compare(this.startTime, ((Task) other).startTime);
   > }
   > ```
   > 
   > 2. Simplify the original GlobalTaskDispatchWaitingQueue to a queue sorted 
only by time. Tasks (taskExecutionRunnable) are retrieved based on time 
priority, and task priority is no longer considered.
   > 3. The main modification is in 
GlobalTaskDispatchWaitingQueueLooper.doDispatch. Tasks taken from the queue are 
placed into the DelayQueue of WorkerGroupQueueMap based on their associated 
workergroup. Subsequently, the taskExecutionRunnable is polled from each 
workergroup's queue in WorkerGroupQueueMap. If execution fails, the task is 
re-added to the queue.
   > 
   > ```
   > doDispatch(){
   >     // Retrieve taskExecutionRunnable from GlobalTaskDispatchWaitingQueue
   >     // Add it to WorkerGroupQueueMap.add("groupname", 
taskExecutionRunnable)
   > 
   >     // Poll the first object from all queues
   >     WorkerGroupQueueMap.poll()
   >     // Iterate and process
   >     for(String workerGroup : queueMap.keySet()){
   >         try {
   >             task = WorkerGroupQueueMap.poll()
   >             // Dispatch the task
   >         } catch (Exception e) {
   >             WorkerGroupQueueMap.add(task.getWorkerGroupName(), task)
   >         }
   >     }
   > }
   > 
   > WorkerGroupQueueMap {
   >     Map<String, DelayQueue<DelayEntry<ITaskExecutionRunnable>>> queueMap = 
new HashMap<>();
   > 
   >     add(groupName, taskExecutionRunnable);
   >   
   >     Map<String, ITaskExecutionRunnable> poll() {
   >         Map<String, ITaskExecutionRunnable> taskExecutionRunnablesMap = 
new HashMap<>();
   >         for(String workerGroup : queueMap.keySet()){
   >             DelayQueue<DelayEntry<ITaskExecutionRunnable>> queue = 
queueMap.get(workerGroup);
   >             taskExecutionRunnablesMap.put(workerGroup, 
queue.poll().getData());
   >         }
   >         return taskExecutionRunnablesMap;
   >     }
   > }
   > ```
   
   So there will exist two thread? One will consume from the 
`GlobalTaskDispatchWaitingQueue` or named `TaskQueueWithDelay` another thread 
will consume from `WorkerGroupQueue` or named `WorkGroupTaskQueue`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to