det101 commented on issue #16260:
URL: 
https://github.com/apache/dolphinscheduler/issues/16260#issuecomment-2614235053

   
1.增加WorkerGroupQueueMap,根据workergroup名称创建DelayQueue队列,队列按任务优先级排序和到期时间排序,优先级高于到期时间。
   
   ```
   public int compareTo(@NotNull Delayed o) {
        // 首先比较优先级
       int priorityComparison = Integer.compare(this.priority, ((Task) 
other).priority);
       if (priorityComparison != 0) {
           return priorityComparison; // 如果优先级不同,直接返回比较结果
       }
   
       // 如果优先级相同,则比较到期时间
       return Long.compare(this.startTime, ((Task) other).startTime);
   }
   ```
   2.原来的GlobalTaskDispatchWaitingQueue 
简化为只按时间排序的队列,根据时间优先级取出taskExecutionRunnable,不再关注任务优先级. 
   3.主要修改GlobalTaskDispatchWaitingQueueLooper.doDispatch 
take的任务根据归属的workergroup放入WorkerGroupQueueMap的DelayQueue;随后从WorkerGroupQueueMap 
poll出每个workergroup的taskExecutionRunnable,如果执行失败重新放入
   
   ```
   doDispatch(){
     //从GlobalTaskDispatchWaitingQueue取出taskExecutionRunnable
     //添加到WorkerGroupQueueMap.add("groupname",taskExecutionRunnable)
   
     //取出所有队列的队首对象
     WorkerGroupQueueMap.poll()
     //遍历处理
     for(String workerGroup : queueMap.keySet()){
        try{
          task = WorkerGroupQueueMap.poll()
          //分发
        }catch(e){
          WorkerGroupQueueMap.add(task.getWorkerGroupName,task)
        }
     }
   }
   
   WorkerGroupQueueMap {
     Map<String, DelayQueue<DelayEntry<ITaskExecutionRunnable>>> queueMap = new 
HashMap<>();
   
     add(groupName,taskExecutionRunnable);
     
     Map<String,ITaskExecutionRunnable> poll() {
           Map<String,ITaskExecutionRunnable> taskExecutionRunnablesMap = new 
HashMap<>();
           for(String workerGroup : queueMap.keySet()){
               DelayQueue<DelayEntry<ITaskExecutionRunnable>> queue = 
queueMap.get(workerGroup);
               taskExecutionRunnablesMap.put(workerGroup, 
queue.poll().getData());
           }
           return taskExecutionRunnablesMap;
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to