This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new d89e5de8159 [improvement](tablet scheduler) fix higher priority tablet 
add failed due to pending queue full #41076 (#41268)
d89e5de8159 is described below

commit d89e5de81599d17a56c70ea05ec13d6baac052c9
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Thu Sep 26 22:31:20 2024 +0800

    [improvement](tablet scheduler) fix higher priority tablet add failed due 
to pending queue full #41076 (#41268)
    
    cherry pick from #41076
---
 .../clone/ColocateTableCheckerAndBalancer.java     | 11 ++++---
 .../java/org/apache/doris/clone/TabletChecker.java | 12 ++++++--
 .../org/apache/doris/clone/TabletSchedCtx.java     |  4 +--
 .../org/apache/doris/clone/TabletScheduler.java    | 33 +++++++++++++++-----
 .../org/apache/doris/clone/TabletSchedCtxTest.java | 36 ++++++++++++++++++++--
 5 files changed, 77 insertions(+), 19 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 4febec9e922..7727bc77e18 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -561,15 +561,17 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                                     
tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);
 
                                     AddResult res = 
tabletScheduler.addTablet(tabletCtx, false /* not force */);
-                                    if (res == AddResult.LIMIT_EXCEED || res 
== AddResult.DISABLED) {
+                                    if (res == AddResult.DISABLED) {
                                         // tablet in scheduler exceed limit, 
or scheduler is disabled,
                                         // skip this group and check next one.
                                         LOG.info("tablet scheduler return: {}. 
stop colocate table check", res.name());
                                         break OUT;
                                     } else if (res == AddResult.ADDED) {
                                         counter.addToSchedulerTabletNum++;
-                                    }  else {
+                                    } else if (res == AddResult.ALREADY_IN) {
                                         counter.tabletInScheduler++;
+                                    } else if (res == AddResult.REPLACE_ADDED 
|| res == AddResult.LIMIT_EXCEED) {
+                                        counter.tabletExceedLimit++;
                                     }
                                 }
                             }
@@ -589,9 +591,10 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
         } // end for groups
 
         long cost = System.currentTimeMillis() - start;
-        LOG.info("finished to check tablets. 
unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms",
+        LOG.info("finished to check tablets. 
unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}, "
+                + "cost: {} ms",
                 counter.unhealthyTabletNum, counter.totalTabletNum, 
counter.addToSchedulerTabletNum,
-                counter.tabletInScheduler, counter.tabletNotReady, cost);
+                counter.tabletInScheduler, counter.tabletNotReady, 
counter.tabletExceedLimit, cost);
     }
 
     private GlobalColocateStatistic buildGlobalColocateStatistic() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
index f35282d37b6..78795e54f95 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
@@ -78,6 +78,7 @@ public class TabletChecker extends MasterDaemon {
             put("added", new AtomicLong(0L));
             put("in_sched", new AtomicLong(0L));
             put("not_ready", new AtomicLong(0L));
+            put("exceed_limit", new AtomicLong(0L));
         }
     };
 
@@ -224,6 +225,7 @@ public class TabletChecker extends MasterDaemon {
         public long addToSchedulerTabletNum = 0;
         public long tabletInScheduler = 0;
         public long tabletNotReady = 0;
+        public long tabletExceedLimit = 0;
     }
 
     private enum LoopControlStatus {
@@ -344,10 +346,12 @@ public class TabletChecker extends MasterDaemon {
         tabletCountByStatus.get("added").set(counter.addToSchedulerTabletNum);
         tabletCountByStatus.get("in_sched").set(counter.tabletInScheduler);
         tabletCountByStatus.get("not_ready").set(counter.tabletNotReady);
+        tabletCountByStatus.get("exceed_limit").set(counter.tabletExceedLimit);
 
-        LOG.info("finished to check tablets. 
unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms",
+        LOG.info("finished to check tablets. 
unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{},"
+                + "cost: {} ms",
                 counter.unhealthyTabletNum, counter.totalTabletNum, 
counter.addToSchedulerTabletNum,
-                counter.tabletInScheduler, counter.tabletNotReady, cost);
+                counter.tabletInScheduler, counter.tabletNotReady, 
counter.tabletExceedLimit, cost);
     }
 
     private LoopControlStatus handlePartitionTablet(Database db, OlapTable 
tbl, Partition partition, boolean isInPrios,
@@ -404,11 +408,13 @@ public class TabletChecker extends MasterDaemon {
                 tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);
 
                 AddResult res = tabletScheduler.addTablet(tabletCtx, false /* 
not force */);
-                if (res == AddResult.LIMIT_EXCEED || res == 
AddResult.DISABLED) {
+                if (res == AddResult.DISABLED) {
                     LOG.info("tablet scheduler return: {}. stop tablet 
checker", res.name());
                     return LoopControlStatus.BREAK_OUT;
                 } else if (res == AddResult.ADDED) {
                     counter.addToSchedulerTabletNum++;
+                } else if (res == AddResult.REPLACE_ADDED || res == 
AddResult.LIMIT_EXCEED) {
+                    counter.tabletExceedLimit++;
                 }
             }
         } // indices
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index d004d21f79c..5e29adbb6da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -1334,13 +1334,13 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                 if (tabletHealth.aliveAndVersionCompleteNum < (replicaNum / 2 
+ 1)) {
                     value -= 3 * baseTime;
                     if (tabletHealth.hasRecentLoadFailed) {
-                        value -= 3 * baseTime;
+                        value -= 4 * baseTime;
                     }
                 }
                 if (tabletHealth.hasAliveAndVersionIncomplete) {
                     value -= 1 * baseTime;
                     if (isUniqKeyMergeOnWrite) {
-                        value -= 1 * baseTime;
+                        value -= 2 * baseTime;
                     }
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 9768cc764ef..97c9be0e887 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -71,6 +71,7 @@ import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.MinMaxPriorityQueue;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Table;
 import org.apache.logging.log4j.LogManager;
@@ -81,7 +82,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 
@@ -120,7 +120,7 @@ public class TabletScheduler extends MasterDaemon {
      *
      * pendingTablets, allTabletTypes, runningTablets and schedHistory are 
protected by 'synchronized'
      */
-    private PriorityQueue<TabletSchedCtx> pendingTablets = new 
PriorityQueue<>();
+    private MinMaxPriorityQueue<TabletSchedCtx> pendingTablets = 
MinMaxPriorityQueue.create();
     private Map<Long, TabletSchedCtx.Type> allTabletTypes = Maps.newHashMap();
     // contains all tabletCtxs which state are RUNNING
     private Map<Long, TabletSchedCtx> runningTablets = Maps.newHashMap();
@@ -149,6 +149,7 @@ public class TabletScheduler extends MasterDaemon {
         ADDED, // success to add
         ALREADY_IN, // already added, skip
         LIMIT_EXCEED, // number of pending tablets exceed the limit
+        REPLACE_ADDED,  // succ to add, and envit a lowest task
         DISABLED // scheduler has been disabled.
     }
 
@@ -268,12 +269,22 @@ public class TabletScheduler extends MasterDaemon {
             return AddResult.ALREADY_IN;
         }
 
+        AddResult addResult = AddResult.ADDED;
         // if this is not a force add,
         // and number of scheduling tablets exceed the limit,
         // refuse to add.
-        if (!force && (pendingTablets.size() > Config.max_scheduling_tablets
-                || runningTablets.size() > Config.max_scheduling_tablets)) {
-            return AddResult.LIMIT_EXCEED;
+        if (!force && (pendingTablets.size() >= Config.max_scheduling_tablets
+                || runningTablets.size() >= Config.max_scheduling_tablets)) {
+            // For a sched tablet, if its compare value is bigger, it will be 
more close to queue's tail position,
+            // and its priority is lower.
+            TabletSchedCtx lowestPriorityTablet = pendingTablets.peekLast();
+            if (lowestPriorityTablet == null || 
lowestPriorityTablet.compareTo(tablet) <= 0) {
+                return AddResult.LIMIT_EXCEED;
+            }
+            addResult = AddResult.REPLACE_ADDED;
+            pendingTablets.pollLast();
+            finalizeTabletCtx(lowestPriorityTablet, 
TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
+                    "envit lower priority sched tablet because pending queue 
is full");
         }
 
         if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) {
@@ -285,7 +296,7 @@ public class TabletScheduler extends MasterDaemon {
             LOG.info("Add tablet to pending queue, {}", tablet);
         }
 
-        return AddResult.ADDED;
+        return addResult;
     }
 
 
@@ -306,11 +317,12 @@ public class TabletScheduler extends MasterDaemon {
      * Iterate current tablets, change their priority to VERY_HIGH if 
necessary.
      */
     public synchronized void changeTabletsPriorityToVeryHigh(long dbId, long 
tblId, List<Long> partitionIds) {
-        PriorityQueue<TabletSchedCtx> newPendingTablets = new 
PriorityQueue<>();
+        MinMaxPriorityQueue<TabletSchedCtx> newPendingTablets = 
MinMaxPriorityQueue.create();
         for (TabletSchedCtx tabletCtx : pendingTablets) {
             if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId
                     && partitionIds.contains(tabletCtx.getPartitionId())) {
                 tabletCtx.setPriority(Priority.VERY_HIGH);
+                tabletCtx.setLastVisitedTime(1L);
             }
             newPendingTablets.add(tabletCtx);
         }
@@ -1745,7 +1757,7 @@ public class TabletScheduler extends MasterDaemon {
             slotNum = 1;
         }
         while (list.size() < Config.schedule_batch_size && slotNum > 0) {
-            TabletSchedCtx tablet = pendingTablets.poll();
+            TabletSchedCtx tablet = pendingTablets.pollFirst();
             if (tablet == null) {
                 // no more tablets
                 break;
@@ -1947,6 +1959,11 @@ public class TabletScheduler extends MasterDaemon {
         });
     }
 
+    // only use for fe ut
+    public MinMaxPriorityQueue<TabletSchedCtx> getPendingTabletQueue() {
+        return pendingTablets;
+    }
+
     public List<List<String>> getPendingTabletsInfo(int limit) {
         return collectTabletCtx(getPendingTablets(limit));
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
index 852f072eca1..a8f48949239 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
@@ -17,25 +17,57 @@
 
 package org.apache.doris.clone;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.clone.TabletSchedCtx.Type;
+import org.apache.doris.common.Config;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.MinMaxPriorityQueue;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.PriorityQueue;
 
 public class TabletSchedCtxTest {
 
+    @Test
+    public void testAddTablet() {
+        List<TabletSchedCtx> tablets = Lists.newArrayList();
+        ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
+        for (long i = 0; i < 20; i++) {
+            tablets.add(new TabletSchedCtx(Type.REPAIR, 1, 2, 3, 4,
+                    i, replicaAlloc, i));
+            tablets.add(new TabletSchedCtx(Type.BALANCE, 1, 2, 3, 4,
+                    1000 + i, replicaAlloc, i));
+        }
+        Collections.shuffle(tablets);
+        Config.max_scheduling_tablets = 5;
+        TabletScheduler scheduler = Env.getCurrentEnv().getTabletScheduler();
+        for (TabletSchedCtx tablet : tablets) {
+            scheduler.addTablet(tablet, false);
+        }
+
+        MinMaxPriorityQueue<TabletSchedCtx> queue = 
scheduler.getPendingTabletQueue();
+        List<TabletSchedCtx> gotTablets = Lists.newArrayList();
+        while (!queue.isEmpty()) {
+            gotTablets.add(queue.pollFirst());
+        }
+        Assert.assertEquals(Config.max_scheduling_tablets, gotTablets.size());
+        for (int i = 0; i < gotTablets.size(); i++) {
+            TabletSchedCtx tablet = gotTablets.get(i);
+            Assert.assertEquals(Type.REPAIR, tablet.getType());
+            Assert.assertEquals((long) i, tablet.getCreateTime());
+        }
+    }
+
     @Test
     public void testPriorityCompare() {
         // equal priority, but info3's last visit time is earlier than info2 
and info1, so info1 should ranks ahead
-        PriorityQueue<TabletSchedCtx> pendingTablets = new PriorityQueue<>();
+        MinMaxPriorityQueue<TabletSchedCtx> pendingTablets = 
MinMaxPriorityQueue.create();
         ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
         TabletSchedCtx ctx1 = new TabletSchedCtx(Type.REPAIR,
                 1, 2, 3, 4, 1000, replicaAlloc, System.currentTimeMillis());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to