This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new bc6759fc8d9 [branch-2.0](routine-load) optimize routine load task schedule to make consume real-time (#31273) (#37431) bc6759fc8d9 is described below commit bc6759fc8d9768238117f05942609ea83c86a720 Author: hui lai <1353307...@qq.com> AuthorDate: Mon Jul 8 17:16:39 2024 +0800 [branch-2.0](routine-load) optimize routine load task schedule to make consume real-time (#31273) (#37431) pick from: #31273 --- .../doris/load/routineload/RoutineLoadTaskScheduler.java | 13 ++++++------- .../load/routineload/RoutineLoadTaskSchedulerTest.java | 7 +++---- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 10eec92b5e3..e310116fbd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -38,12 +38,11 @@ import org.apache.doris.thrift.TStatusCode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import com.google.common.collect.Queues; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; /** * Routine load task scheduler is a function which allocate task to be. @@ -61,7 +60,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { private static final long SLOT_FULL_SLEEP_MS = 10000; // 10s private RoutineLoadManager routineLoadManager; - private LinkedBlockingQueue<RoutineLoadTaskInfo> needScheduleTasksQueue = Queues.newLinkedBlockingQueue(); + private LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue = new LinkedBlockingDeque<>(); private long lastBackendSlotUpdateTime = -1; @@ -105,7 +104,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { if (System.currentTimeMillis() - routineLoadTaskInfo.getLastScheduledTime() < routineLoadTaskInfo.getTimeoutMs()) { // try to delay scheduling this task for 'timeout', to void too many failure - needScheduleTasksQueue.put(routineLoadTaskInfo); + needScheduleTasksQueue.addLast(routineLoadTaskInfo); return; } scheduleOneTask(routineLoadTaskInfo); @@ -131,7 +130,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { try { // check if topic has more data to consume if (!routineLoadTaskInfo.hasMoreDataToConsume()) { - needScheduleTasksQueue.put(routineLoadTaskInfo); + needScheduleTasksQueue.addLast(routineLoadTaskInfo); return; } @@ -139,7 +138,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { // this should be done before txn begin, or the txn may be begun successfully but failed to be allocated. if (!allocateTaskToBe(routineLoadTaskInfo)) { // allocate failed, push it back to the queue to wait next scheduling - needScheduleTasksQueue.put(routineLoadTaskInfo); + needScheduleTasksQueue.addFirst(routineLoadTaskInfo); return; } } catch (UserException e) { @@ -162,7 +161,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { // begin txn failed. push it back to the queue to wait next scheduling // set BE id to -1 to release the BE slot routineLoadTaskInfo.setBeId(-1); - needScheduleTasksQueue.put(routineLoadTaskInfo); + needScheduleTasksQueue.addFirst(routineLoadTaskInfo); return; } } catch (Exception e) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index 02db47538fb..0ce694bfb11 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -32,15 +32,14 @@ import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.GlobalTransactionMgr; import com.google.common.collect.Maps; -import com.google.common.collect.Queues; import mockit.Expectations; import mockit.Injectable; import mockit.Mocked; import org.junit.Test; import java.util.Map; -import java.util.Queue; import java.util.UUID; +import java.util.concurrent.LinkedBlockingDeque; public class RoutineLoadTaskSchedulerTest { @@ -68,10 +67,10 @@ public class RoutineLoadTaskSchedulerTest { KafkaProgress kafkaProgress = new KafkaProgress(); Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset); - Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue(); + LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = new LinkedBlockingDeque<>(); KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", 20000, 0, partitionIdToOffset, false); - routineLoadTaskInfoQueue.add(routineLoadTaskInfo1); + routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1); Map<Long, RoutineLoadTaskInfo> idToRoutineLoadTask = Maps.newHashMap(); idToRoutineLoadTask.put(1L, routineLoadTaskInfo1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org