This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new bc6759fc8d9 [branch-2.0](routine-load) optimize routine load task 
schedule to make consume real-time (#31273) (#37431)
bc6759fc8d9 is described below

commit bc6759fc8d9768238117f05942609ea83c86a720
Author: hui lai <1353307...@qq.com>
AuthorDate: Mon Jul 8 17:16:39 2024 +0800

    [branch-2.0](routine-load) optimize routine load task schedule to make 
consume real-time (#31273) (#37431)
    
    pick from: #31273
---
 .../doris/load/routineload/RoutineLoadTaskScheduler.java    | 13 ++++++-------
 .../load/routineload/RoutineLoadTaskSchedulerTest.java      |  7 +++----
 2 files changed, 9 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 10eec92b5e3..e310116fbd6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -38,12 +38,11 @@ import org.apache.doris.thrift.TStatusCode;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 
 /**
  * Routine load task scheduler is a function which allocate task to be.
@@ -61,7 +60,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
     private static final long SLOT_FULL_SLEEP_MS = 10000; // 10s
 
     private RoutineLoadManager routineLoadManager;
-    private LinkedBlockingQueue<RoutineLoadTaskInfo> needScheduleTasksQueue = 
Queues.newLinkedBlockingQueue();
+    private LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue = 
new LinkedBlockingDeque<>();
 
     private long lastBackendSlotUpdateTime = -1;
 
@@ -105,7 +104,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
             if (System.currentTimeMillis() - 
routineLoadTaskInfo.getLastScheduledTime()
                     < routineLoadTaskInfo.getTimeoutMs()) {
                 // try to delay scheduling this task for 'timeout', to void 
too many failure
-                needScheduleTasksQueue.put(routineLoadTaskInfo);
+                needScheduleTasksQueue.addLast(routineLoadTaskInfo);
                 return;
             }
             scheduleOneTask(routineLoadTaskInfo);
@@ -131,7 +130,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
         try {
             // check if topic has more data to consume
             if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
-                needScheduleTasksQueue.put(routineLoadTaskInfo);
+                needScheduleTasksQueue.addLast(routineLoadTaskInfo);
                 return;
             }
 
@@ -139,7 +138,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
             // this should be done before txn begin, or the txn may be begun 
successfully but failed to be allocated.
             if (!allocateTaskToBe(routineLoadTaskInfo)) {
                 // allocate failed, push it back to the queue to wait next 
scheduling
-                needScheduleTasksQueue.put(routineLoadTaskInfo);
+                needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
                 return;
             }
         } catch (UserException e) {
@@ -162,7 +161,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
                 // begin txn failed. push it back to the queue to wait next 
scheduling
                 // set BE id to -1 to release the BE slot
                 routineLoadTaskInfo.setBeId(-1);
-                needScheduleTasksQueue.put(routineLoadTaskInfo);
+                needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
                 return;
             }
         } catch (Exception e) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index 02db47538fb..0ce694bfb11 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -32,15 +32,14 @@ import 
org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 
 import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
 import mockit.Expectations;
 import mockit.Injectable;
 import mockit.Mocked;
 import org.junit.Test;
 
 import java.util.Map;
-import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.LinkedBlockingDeque;
 
 public class RoutineLoadTaskSchedulerTest {
 
@@ -68,10 +67,10 @@ public class RoutineLoadTaskSchedulerTest {
         KafkaProgress kafkaProgress = new KafkaProgress();
         Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", 
partitionIdToOffset);
 
-        Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = 
Queues.newLinkedBlockingQueue();
+        LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = 
new LinkedBlockingDeque<>();
         KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 
1L, "default_cluster", 20000, 0,
                 partitionIdToOffset, false);
-        routineLoadTaskInfoQueue.add(routineLoadTaskInfo1);
+        routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1);
 
         Map<Long, RoutineLoadTaskInfo> idToRoutineLoadTask = Maps.newHashMap();
         idToRoutineLoadTask.put(1L, routineLoadTaskInfo1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to