This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new d89e5de8159 [improvement](tablet scheduler) fix higher priority tablet add failed due to pending queue full #41076 (#41268) d89e5de8159 is described below commit d89e5de81599d17a56c70ea05ec13d6baac052c9 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Thu Sep 26 22:31:20 2024 +0800 [improvement](tablet scheduler) fix higher priority tablet add failed due to pending queue full #41076 (#41268) cherry pick from #41076 --- .../clone/ColocateTableCheckerAndBalancer.java | 11 ++++--- .../java/org/apache/doris/clone/TabletChecker.java | 12 ++++++-- .../org/apache/doris/clone/TabletSchedCtx.java | 4 +-- .../org/apache/doris/clone/TabletScheduler.java | 33 +++++++++++++++----- .../org/apache/doris/clone/TabletSchedCtxTest.java | 36 ++++++++++++++++++++-- 5 files changed, 77 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index 4febec9e922..7727bc77e18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -561,15 +561,17 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite); AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */); - if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) { + if (res == AddResult.DISABLED) { // tablet in scheduler exceed limit, or scheduler is disabled, // skip this group and check next one. LOG.info("tablet scheduler return: {}. stop colocate table check", res.name()); break OUT; } else if (res == AddResult.ADDED) { counter.addToSchedulerTabletNum++; - } else { + } else if (res == AddResult.ALREADY_IN) { counter.tabletInScheduler++; + } else if (res == AddResult.REPLACE_ADDED || res == AddResult.LIMIT_EXCEED) { + counter.tabletExceedLimit++; } } } @@ -589,9 +591,10 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { } // end for groups long cost = System.currentTimeMillis() - start; - LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms", + LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}, " + + "cost: {} ms", counter.unhealthyTabletNum, counter.totalTabletNum, counter.addToSchedulerTabletNum, - counter.tabletInScheduler, counter.tabletNotReady, cost); + counter.tabletInScheduler, counter.tabletNotReady, counter.tabletExceedLimit, cost); } private GlobalColocateStatistic buildGlobalColocateStatistic() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java index f35282d37b6..78795e54f95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -78,6 +78,7 @@ public class TabletChecker extends MasterDaemon { put("added", new AtomicLong(0L)); put("in_sched", new AtomicLong(0L)); put("not_ready", new AtomicLong(0L)); + put("exceed_limit", new AtomicLong(0L)); } }; @@ -224,6 +225,7 @@ public class TabletChecker extends MasterDaemon { public long addToSchedulerTabletNum = 0; public long tabletInScheduler = 0; public long tabletNotReady = 0; + public long tabletExceedLimit = 0; } private enum LoopControlStatus { @@ -344,10 +346,12 @@ public class TabletChecker extends MasterDaemon { tabletCountByStatus.get("added").set(counter.addToSchedulerTabletNum); tabletCountByStatus.get("in_sched").set(counter.tabletInScheduler); tabletCountByStatus.get("not_ready").set(counter.tabletNotReady); + tabletCountByStatus.get("exceed_limit").set(counter.tabletExceedLimit); - LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms", + LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}," + + "cost: {} ms", counter.unhealthyTabletNum, counter.totalTabletNum, counter.addToSchedulerTabletNum, - counter.tabletInScheduler, counter.tabletNotReady, cost); + counter.tabletInScheduler, counter.tabletNotReady, counter.tabletExceedLimit, cost); } private LoopControlStatus handlePartitionTablet(Database db, OlapTable tbl, Partition partition, boolean isInPrios, @@ -404,11 +408,13 @@ public class TabletChecker extends MasterDaemon { tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite); AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */); - if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) { + if (res == AddResult.DISABLED) { LOG.info("tablet scheduler return: {}. stop tablet checker", res.name()); return LoopControlStatus.BREAK_OUT; } else if (res == AddResult.ADDED) { counter.addToSchedulerTabletNum++; + } else if (res == AddResult.REPLACE_ADDED || res == AddResult.LIMIT_EXCEED) { + counter.tabletExceedLimit++; } } } // indices diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index d004d21f79c..5e29adbb6da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -1334,13 +1334,13 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { if (tabletHealth.aliveAndVersionCompleteNum < (replicaNum / 2 + 1)) { value -= 3 * baseTime; if (tabletHealth.hasRecentLoadFailed) { - value -= 3 * baseTime; + value -= 4 * baseTime; } } if (tabletHealth.hasAliveAndVersionIncomplete) { value -= 1 * baseTime; if (isUniqKeyMergeOnWrite) { - value -= 1 * baseTime; + value -= 2 * baseTime; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 9768cc764ef..97c9be0e887 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -71,6 +71,7 @@ import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Sets; import com.google.common.collect.Table; import org.apache.logging.log4j.LogManager; @@ -81,7 +82,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; @@ -120,7 +120,7 @@ public class TabletScheduler extends MasterDaemon { * * pendingTablets, allTabletTypes, runningTablets and schedHistory are protected by 'synchronized' */ - private PriorityQueue<TabletSchedCtx> pendingTablets = new PriorityQueue<>(); + private MinMaxPriorityQueue<TabletSchedCtx> pendingTablets = MinMaxPriorityQueue.create(); private Map<Long, TabletSchedCtx.Type> allTabletTypes = Maps.newHashMap(); // contains all tabletCtxs which state are RUNNING private Map<Long, TabletSchedCtx> runningTablets = Maps.newHashMap(); @@ -149,6 +149,7 @@ public class TabletScheduler extends MasterDaemon { ADDED, // success to add ALREADY_IN, // already added, skip LIMIT_EXCEED, // number of pending tablets exceed the limit + REPLACE_ADDED, // succ to add, and envit a lowest task DISABLED // scheduler has been disabled. } @@ -268,12 +269,22 @@ public class TabletScheduler extends MasterDaemon { return AddResult.ALREADY_IN; } + AddResult addResult = AddResult.ADDED; // if this is not a force add, // and number of scheduling tablets exceed the limit, // refuse to add. - if (!force && (pendingTablets.size() > Config.max_scheduling_tablets - || runningTablets.size() > Config.max_scheduling_tablets)) { - return AddResult.LIMIT_EXCEED; + if (!force && (pendingTablets.size() >= Config.max_scheduling_tablets + || runningTablets.size() >= Config.max_scheduling_tablets)) { + // For a sched tablet, if its compare value is bigger, it will be more close to queue's tail position, + // and its priority is lower. + TabletSchedCtx lowestPriorityTablet = pendingTablets.peekLast(); + if (lowestPriorityTablet == null || lowestPriorityTablet.compareTo(tablet) <= 0) { + return AddResult.LIMIT_EXCEED; + } + addResult = AddResult.REPLACE_ADDED; + pendingTablets.pollLast(); + finalizeTabletCtx(lowestPriorityTablet, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, + "envit lower priority sched tablet because pending queue is full"); } if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) { @@ -285,7 +296,7 @@ public class TabletScheduler extends MasterDaemon { LOG.info("Add tablet to pending queue, {}", tablet); } - return AddResult.ADDED; + return addResult; } @@ -306,11 +317,12 @@ public class TabletScheduler extends MasterDaemon { * Iterate current tablets, change their priority to VERY_HIGH if necessary. */ public synchronized void changeTabletsPriorityToVeryHigh(long dbId, long tblId, List<Long> partitionIds) { - PriorityQueue<TabletSchedCtx> newPendingTablets = new PriorityQueue<>(); + MinMaxPriorityQueue<TabletSchedCtx> newPendingTablets = MinMaxPriorityQueue.create(); for (TabletSchedCtx tabletCtx : pendingTablets) { if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId && partitionIds.contains(tabletCtx.getPartitionId())) { tabletCtx.setPriority(Priority.VERY_HIGH); + tabletCtx.setLastVisitedTime(1L); } newPendingTablets.add(tabletCtx); } @@ -1745,7 +1757,7 @@ public class TabletScheduler extends MasterDaemon { slotNum = 1; } while (list.size() < Config.schedule_batch_size && slotNum > 0) { - TabletSchedCtx tablet = pendingTablets.poll(); + TabletSchedCtx tablet = pendingTablets.pollFirst(); if (tablet == null) { // no more tablets break; @@ -1947,6 +1959,11 @@ public class TabletScheduler extends MasterDaemon { }); } + // only use for fe ut + public MinMaxPriorityQueue<TabletSchedCtx> getPendingTabletQueue() { + return pendingTablets; + } + public List<List<String>> getPendingTabletsInfo(int limit) { return collectTabletCtx(getPendingTablets(limit)); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java index 852f072eca1..a8f48949239 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java @@ -17,25 +17,57 @@ package org.apache.doris.clone; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletSchedCtx.Type; +import org.apache.doris.common.Config; import com.google.common.collect.Lists; +import com.google.common.collect.MinMaxPriorityQueue; import org.junit.Assert; import org.junit.Test; import java.util.Collections; import java.util.List; -import java.util.PriorityQueue; public class TabletSchedCtxTest { + @Test + public void testAddTablet() { + List<TabletSchedCtx> tablets = Lists.newArrayList(); + ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; + for (long i = 0; i < 20; i++) { + tablets.add(new TabletSchedCtx(Type.REPAIR, 1, 2, 3, 4, + i, replicaAlloc, i)); + tablets.add(new TabletSchedCtx(Type.BALANCE, 1, 2, 3, 4, + 1000 + i, replicaAlloc, i)); + } + Collections.shuffle(tablets); + Config.max_scheduling_tablets = 5; + TabletScheduler scheduler = Env.getCurrentEnv().getTabletScheduler(); + for (TabletSchedCtx tablet : tablets) { + scheduler.addTablet(tablet, false); + } + + MinMaxPriorityQueue<TabletSchedCtx> queue = scheduler.getPendingTabletQueue(); + List<TabletSchedCtx> gotTablets = Lists.newArrayList(); + while (!queue.isEmpty()) { + gotTablets.add(queue.pollFirst()); + } + Assert.assertEquals(Config.max_scheduling_tablets, gotTablets.size()); + for (int i = 0; i < gotTablets.size(); i++) { + TabletSchedCtx tablet = gotTablets.get(i); + Assert.assertEquals(Type.REPAIR, tablet.getType()); + Assert.assertEquals((long) i, tablet.getCreateTime()); + } + } + @Test public void testPriorityCompare() { // equal priority, but info3's last visit time is earlier than info2 and info1, so info1 should ranks ahead - PriorityQueue<TabletSchedCtx> pendingTablets = new PriorityQueue<>(); + MinMaxPriorityQueue<TabletSchedCtx> pendingTablets = MinMaxPriorityQueue.create(); ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; TabletSchedCtx ctx1 = new TabletSchedCtx(Type.REPAIR, 1, 2, 3, 4, 1000, replicaAlloc, System.currentTimeMillis()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org