This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e58c6d0e872cede53d060774ec58fb7859184204
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Wed Feb 7 07:02:41 2024 +0800

    [improvement](balance) fix multiple problems for balance on large cluster 
(#30713)
---
 .../main/java/org/apache/doris/common/Config.java  |  31 +++-
 .../main/java/org/apache/doris/common/Pair.java    |   2 +-
 .../apache/doris/catalog/TabletInvertedIndex.java  |  19 ++-
 .../apache/doris/clone/BackendLoadStatistic.java   |  68 ++++++--
 .../org/apache/doris/clone/BeLoadRebalancer.java   | 190 +++++++++++++++------
 .../apache/doris/clone/LoadStatisticForTag.java    | 166 +++++++++++++++++-
 .../apache/doris/clone/RootPathLoadStatistic.java  |  30 +++-
 .../org/apache/doris/clone/TabletScheduler.java    |  96 ++++++++---
 .../common/proc/BackendLoadStatisticProcNode.java  |   2 +-
 .../common/proc/ClusterLoadStatisticProcDir.java   |   2 +-
 .../doris/clone/ClusterLoadStatisticsTest.java     |  72 +++++---
 11 files changed, 544 insertions(+), 134 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a7f642944ed..5a239a8c407 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1014,6 +1014,30 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static double balance_load_score_threshold = 0.1; // 10%
 
+    // if disk usage > balance_load_score_threshold + 
urgent_disk_usage_extra_threshold
+    // then this disk need schedule quickly
+    // this value could less than 0.
+    @ConfField(mutable = true, masterOnly = true)
+    public static double urgent_balance_disk_usage_extra_threshold = 0.05;
+
+    // when run urgent disk balance, shuffle the top large tablets
+    // range: [ 0 ~ 100 ]
+    @ConfField(mutable = true, masterOnly = true)
+    public static int urgent_balance_shuffle_large_tablet_percentage = 1;
+
+    @ConfField(mutable = true, masterOnly = true)
+    public static double urgent_balance_pick_large_tablet_num_threshold = 1000;
+
+    // range: 0 ~ 100
+    @ConfField(mutable = true, masterOnly = true)
+    public static int urgent_balance_pick_large_disk_usage_percentage = 80;
+
+    // there's a case, all backend has a high disk, by default, it will not 
run urgent disk balance.
+    // if set this value to true, urgent disk balance will always run,
+    // the backends will exchange tablets among themselves.
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean enable_urgent_balance_no_low_backend = true;
+
     /**
      * if set to true, TabletScheduler will not do balance.
      */
@@ -1024,7 +1048,7 @@ public class Config extends ConfigBase {
      * when be rebalancer idle, then disk balance will occurs.
      */
     @ConfField(mutable = true, masterOnly = true)
-    public static int be_rebalancer_idle_seconds = 60;
+    public static int be_rebalancer_idle_seconds = 0;
 
     /**
      * if set to true, TabletScheduler will not do disk balance.
@@ -1032,6 +1056,11 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static boolean disable_disk_balance = false;
 
+    // balance order
+    // ATTN: a temporary config, may delete later.
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean balance_be_then_disk = true;
+
     /**
      * if set to false, TabletScheduler will not do disk balance for replica 
num = 1.
      */
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java
index c3ae810582c..7d1736a0754 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java
@@ -78,7 +78,7 @@ public class Pair<F, S> {
         return first.toString() + ":" + second.toString();
     }
 
-    public static class PairComparator<T extends Pair<?, Comparable>> 
implements Comparator<T> {
+    public static class PairComparator<T extends Pair<?, ? extends 
Comparable>> implements Comparator<T> {
         @Override
         public int compare(T o1, T o2) {
             return o1.second.compareTo(o2.second);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index d2ea70873f7..8137dddae13 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -644,19 +644,28 @@ public class TabletInvertedIndex {
         return tabletIds;
     }
 
-    public List<Long> getTabletIdsByBackendIdAndStorageMedium(long backendId, 
TStorageMedium storageMedium) {
-        List<Long> tabletIds = Lists.newArrayList();
+    public List<Pair<Long, Long>> 
getTabletSizeByBackendIdAndStorageMedium(long backendId,
+            TStorageMedium storageMedium) {
+        List<Pair<Long, Long>> tabletIdSizes = Lists.newArrayList();
         long stamp = readLock();
         try {
             Map<Long, Replica> replicaMetaWithBackend = 
backingReplicaMetaTable.row(backendId);
             if (replicaMetaWithBackend != null) {
-                tabletIds = replicaMetaWithBackend.keySet().stream().filter(
-                        id -> tabletMetaMap.get(id).getStorageMedium() == 
storageMedium).collect(Collectors.toList());
+                tabletIdSizes = replicaMetaWithBackend.entrySet().stream()
+                        .filter(entry -> 
tabletMetaMap.get(entry.getKey()).getStorageMedium() == storageMedium)
+                        .map(entry -> Pair.of(entry.getKey(), 
entry.getValue().getDataSize()))
+                        .collect(Collectors.toList());
             }
         } finally {
             readUnlock(stamp);
         }
-        return tabletIds;
+        return tabletIdSizes;
+    }
+
+    public List<Long> getTabletIdsByBackendIdAndStorageMedium(long backendId,
+            TStorageMedium storageMedium) {
+        return getTabletSizeByBackendIdAndStorageMedium(backendId, 
storageMedium).stream()
+                .map(Pair::key).collect(Collectors.toList());
     }
 
     public int getTabletNumByBackendId(long backendId) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
index 9ac6fce7f2b..f1f632c997a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
@@ -39,6 +39,8 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class BackendLoadStatistic {
     private static final Logger LOG = 
LogManager.getLogger(BackendLoadStatistic.class);
@@ -166,6 +168,7 @@ public class BackendLoadStatistic {
     private Map<TStorageMedium, Long> totalReplicaNumMap = Maps.newHashMap();
     private Map<TStorageMedium, LoadScore> loadScoreMap = Maps.newHashMap();
     private Map<TStorageMedium, Classification> clazzMap = Maps.newHashMap();
+    private Map<TStorageMedium, Classification> maxDiskClazzMap = 
Maps.newHashMap();
     private List<RootPathLoadStatistic> pathStatistics = Lists.newArrayList();
 
     public BackendLoadStatistic(long beId, Tag tag, SystemInfoService 
infoService,
@@ -227,6 +230,14 @@ public class BackendLoadStatistic {
         return clazzMap.getOrDefault(medium, Classification.INIT);
     }
 
+    public void setMaxDiskClazz(TStorageMedium medium, Classification clazz) {
+        this.maxDiskClazzMap.put(medium, clazz);
+    }
+
+    public Classification getMaxDiskClazz(TStorageMedium medium) {
+        return maxDiskClazzMap.getOrDefault(medium, Classification.INIT);
+    }
+
     public void init() throws LoadBalanceException {
         Backend be = infoService.getBackend(beId);
         if (be == null) {
@@ -246,9 +257,17 @@ public class BackendLoadStatistic {
                         + (diskInfo.getTotalCapacityB() - 
diskInfo.getAvailableCapacityB()));
             }
 
+            // Doris-compose put test all backends' disks on the same physical 
disk.
+            // Make a little change here.
+            long usedCapacityB = diskInfo.getDiskUsedCapacityB();
+            if (Config.be_rebalancer_fuzzy_test) {
+                usedCapacityB = Math.min(diskInfo.getTotalCapacityB(),
+                        usedCapacityB + Math.abs(diskInfo.getPathHash()) % 
10000);
+            }
+
             RootPathLoadStatistic pathStatistic = new 
RootPathLoadStatistic(beId, diskInfo.getRootPath(),
                     diskInfo.getPathHash(), diskInfo.getStorageMedium(),
-                    diskInfo.getTotalCapacityB(), 
diskInfo.getDiskUsedCapacityB(), diskInfo.getState());
+                    diskInfo.getTotalCapacityB(), usedCapacityB, 
diskInfo.getState());
             pathStatistics.add(pathStatistic);
         }
 
@@ -295,14 +314,14 @@ public class BackendLoadStatistic {
             if (Math.abs(pathStat.getUsedPercent() - avgUsedPercent)
                     > Math.max(avgUsedPercent * 
Config.balance_load_score_threshold, 0.025)) {
                 if (pathStat.getUsedPercent() > avgUsedPercent) {
-                    pathStat.setClazz(Classification.HIGH);
+                    pathStat.setLocalClazz(Classification.HIGH);
                     highCounter++;
                 } else if (pathStat.getUsedPercent() < avgUsedPercent) {
-                    pathStat.setClazz(Classification.LOW);
+                    pathStat.setLocalClazz(Classification.LOW);
                     lowCounter++;
                 }
             } else {
-                pathStat.setClazz(Classification.MID);
+                pathStat.setLocalClazz(Classification.MID);
                 midCounter++;
             }
         }
@@ -422,14 +441,19 @@ public class BackendLoadStatistic {
 
             BalanceStatus bStatus = pathStatistic.isFit(tabletSize, 
isSupplement);
             if (!bStatus.ok()) {
-                status.addErrMsgs(bStatus.getErrMsgs());
+                if (status != BalanceStatus.OK) {
+                    status.addErrMsgs(bStatus.getErrMsgs());
+                }
                 continue;
             }
 
-            result.add(pathStatistic);
+            if (result != null) {
+                result.add(pathStatistic);
+            }
+            status = BalanceStatus.OK;
         }
 
-        return result.isEmpty() ? status : BalanceStatus.OK;
+        return status;
     }
 
     /**
@@ -508,9 +532,9 @@ public class BackendLoadStatistic {
                 continue;
             }
 
-            if (pathStat.getClazz() == Classification.LOW) {
+            if (pathStat.getLocalClazz() == Classification.LOW) {
                 low.add(pathStat.getPathHash());
-            } else if (pathStat.getClazz() == Classification.HIGH) {
+            } else if (pathStat.getLocalClazz() == Classification.HIGH) {
                 high.add(pathStat.getPathHash());
             } else {
                 mid.add(pathStat.getPathHash());
@@ -529,9 +553,9 @@ public class BackendLoadStatistic {
                 continue;
             }
 
-            if (pathStat.getClazz() == Classification.LOW) {
+            if (pathStat.getLocalClazz() == Classification.LOW) {
                 low.add(pathStat);
-            } else if (pathStat.getClazz() == Classification.HIGH) {
+            } else if (pathStat.getLocalClazz() == Classification.HIGH) {
                 high.add(pathStat);
             } else {
                 mid.add(pathStat);
@@ -569,9 +593,22 @@ public class BackendLoadStatistic {
         return pathStatistics;
     }
 
-    public long getAvailPathNum(TStorageMedium medium) {
-        return pathStatistics.stream().filter(
-                p -> p.getDiskState() == DiskState.ONLINE && 
p.getStorageMedium() == medium).count();
+    RootPathLoadStatistic getPathStatisticByPathHash(long pathHash) {
+        return pathStatistics.stream().filter(pathStat -> 
pathStat.getPathHash() == pathHash)
+                .findFirst().orElse(null);
+    }
+
+    public List<RootPathLoadStatistic> getAvailPaths(TStorageMedium medium) {
+        return getAvailPathStream(medium).collect(Collectors.toList());
+    }
+
+    public boolean hasAvailPathWithGlobalClazz(TStorageMedium medium, 
Classification globalClazz) {
+        return getAvailPathStream(medium).anyMatch(pathStat -> 
pathStat.getGlobalClazz() == globalClazz);
+    }
+
+    private Stream<RootPathLoadStatistic> getAvailPathStream(TStorageMedium 
medium) {
+        return pathStatistics.stream()
+                .filter(p -> p.getDiskState() == DiskState.ONLINE && 
p.getStorageMedium() == medium);
     }
 
     public boolean hasMedium(TStorageMedium medium) {
@@ -603,6 +640,7 @@ public class BackendLoadStatistic {
         long total = totalCapacityMap.getOrDefault(medium, 0L);
         info.add(String.valueOf(used));
         info.add(String.valueOf(total));
+        info.add(getMaxDiskClazz(medium).name());
         info.add(String.valueOf(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(used * 
100
                 / (double) total)));
         info.add(String.valueOf(totalReplicaNumMap.getOrDefault(medium, 0L)));
@@ -610,7 +648,7 @@ public class BackendLoadStatistic {
         info.add(String.valueOf(loadScore.capacityCoefficient));
         info.add(String.valueOf(loadScore.getReplicaNumCoefficient()));
         info.add(String.valueOf(loadScore.score));
-        info.add(clazzMap.getOrDefault(medium, Classification.INIT).name());
+        info.add(getClazz(medium).name());
         return info;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index e7b6211bd79..b40d7f7a512 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -25,12 +25,14 @@ import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
 import 
org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator;
+import org.apache.doris.clone.BackendLoadStatistic.Classification;
 import org.apache.doris.clone.SchedException.Status;
 import org.apache.doris.clone.SchedException.SubCode;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.clone.TabletScheduler.PathSlot;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TStorageMedium;
@@ -45,6 +47,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /*
  * BeLoadRebalancer strategy:
@@ -79,12 +82,9 @@ public class BeLoadRebalancer extends Rebalancer {
     protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
             LoadStatisticForTag clusterStat, TStorageMedium medium) {
         List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
-
-        // get classification of backends
         List<BackendLoadStatistic> lowBEs = Lists.newArrayList();
-        List<BackendLoadStatistic> midBEs = Lists.newArrayList();
         List<BackendLoadStatistic> highBEs = Lists.newArrayList();
-        clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, 
medium);
+        boolean isUrgent = clusterStat.getLowHighBEsWithIsUrgent(lowBEs, 
highBEs, medium);
 
         if (lowBEs.isEmpty() && highBEs.isEmpty()) {
             LOG.debug("cluster is balance with medium: {}. skip", medium);
@@ -117,6 +117,8 @@ public class BeLoadRebalancer extends Rebalancer {
         }
         LOG.info("get number of low load paths: {}, with medium: {}", 
numOfLowPaths, medium);
 
+        List<String> alternativeTabletInfos = Lists.newArrayList();
+
         // Clone ut mocked env, but CatalogRecycleBin is not mockable (it 
extends from Thread)
         // so in clone ut recycleBin need to set to null.
         CatalogRecycleBin recycleBin = null;
@@ -125,6 +127,10 @@ public class BeLoadRebalancer extends Rebalancer {
         }
         int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
         ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
+        List<Set<Long>> lowBETablets = lowBEs.stream()
+                .map(beStat -> 
Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
+                .collect(Collectors.toList());
+
         // choose tablets from high load backends.
         // BackendLoadStatistic is sorted by load score in ascend order,
         // so we need to traverse it from last to first
@@ -136,37 +142,73 @@ public class BeLoadRebalancer extends Rebalancer {
                 continue;
             }
 
-            // classify the paths.
-            Set<Long> pathLow = Sets.newHashSet();
-            Set<Long> pathMid = Sets.newHashSet();
-            Set<Long> pathHigh = Sets.newHashSet();
-            beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
-            // we only select tablets from available mid and high load path
-            pathHigh.addAll(pathMid);
-
-            // get all tablets on this backend, and shuffle them for random 
selection
-            List<Long> tabletIds = 
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(beStat.getBeId(), medium);
-            Collections.shuffle(tabletIds);
+            boolean choseHighDisk = isUrgent && beStat.getMaxDiskClazz(medium) 
== Classification.HIGH;
 
             // for each path, we try to select at most 
BALANCE_SLOT_NUM_FOR_PATH tablets
             Map<Long, Integer> remainingPaths = Maps.newHashMap();
+            Set<Long> pathHigh = null;
+            if (choseHighDisk) {
+                pathHigh = 
beStat.getAvailPaths(medium).stream().filter(RootPathLoadStatistic::isGlobalHighUsage)
+                        
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
+            } else {
+                // classify the paths.
+                pathHigh = Sets.newHashSet();
+                Set<Long> pathLow = Sets.newHashSet();
+                Set<Long> pathMid = Sets.newHashSet();
+                beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, 
medium);
+                // we only select tablets from available mid and high load path
+                pathHigh.addAll(pathMid);
+            }
+
+            double highDiskMaxUsage = 0;
             for (Long pathHash : pathHigh) {
                 int availBalanceNum = 
pathSlot.getAvailableBalanceNum(pathHash);
                 if (availBalanceNum > 0) {
                     remainingPaths.put(pathHash, availBalanceNum);
                 }
+
+                RootPathLoadStatistic pathStat = 
beStat.getPathStatisticByPathHash(pathHash);
+                if (pathStat != null) {
+                    highDiskMaxUsage = Math.max(highDiskMaxUsage, 
pathStat.getUsedPercent());
+                }
             }
 
+            LOG.debug("high be {}, medium: {}, path high: {}, remainingPaths: 
{}, chose high disk: {}",
+                    beStat.getBeId(), medium, pathHigh, remainingPaths, 
choseHighDisk);
+
             if (remainingPaths.isEmpty()) {
                 continue;
             }
 
+            // get all tablets on this backend, and shuffle them for random 
selection
+            List<Pair<Long, Long>> tabletIdSizes = 
invertedIndex.getTabletSizeByBackendIdAndStorageMedium(
+                    beStat.getBeId(), medium);
+            if (!isUrgent
+                    || tabletIdSizes.size() < 
Config.urgent_balance_pick_large_tablet_num_threshold
+                    || highDiskMaxUsage < (double) 
Config.urgent_balance_pick_large_disk_usage_percentage / 100.0
+                    || Config.urgent_balance_shuffle_large_tablet_percentage 
>= 100
+                    || Config.urgent_balance_shuffle_large_tablet_percentage < 
0) {
+                Collections.shuffle(tabletIdSizes);
+            } else {
+                Collections.sort(tabletIdSizes, new 
Pair.PairComparator<Pair<Long, Long>>());
+                if (Config.urgent_balance_shuffle_large_tablet_percentage > 0) 
{
+                    int startIndex = (int) (tabletIdSizes.size()
+                            * (1 - (double) 
Config.urgent_balance_shuffle_large_tablet_percentage / 100.0));
+                    Collections.shuffle(tabletIdSizes.subList(startIndex, 
tabletIdSizes.size()));
+                }
+            }
+
             // select tablet from shuffled tablets
-            for (Long tabletId : tabletIds) {
+            for (int j = tabletIdSizes.size() - 1; j >= 0; j--) {
+                long tabletId = tabletIdSizes.get(j).key();
                 if (clusterAvailableBEnum <= 
invertedIndex.getReplicasByTabletId(tabletId).size()) {
                     continue;
                 }
 
+                if (alternativeTablets.stream().anyMatch(tabletCtx -> tabletId 
== tabletCtx.getTabletId())) {
+                    continue;
+                }
+
                 Replica replica = invertedIndex.getReplica(tabletId, 
beStat.getBeId());
                 if (replica == null) {
                     continue;
@@ -186,20 +228,40 @@ public class BeLoadRebalancer extends Rebalancer {
                         continue;
                     }
 
+                    // for urgent disk, pick tablets order by size,
+                    // then it may always pick tablets that was on the low 
backends.
+                    if (!lowBETablets.isEmpty()
+                            && lowBETablets.stream().allMatch(tablets -> 
tablets.contains(tabletId))) {
+                        continue;
+                    }
+
                     if (recycleBin != null && 
recycleBin.isRecyclePartition(tabletMeta.getDbId(),
                             tabletMeta.getTableId(), 
tabletMeta.getPartitionId())) {
                         continue;
                     }
 
+                    boolean isFit = lowBEs.stream().anyMatch(be -> 
be.isFit(replica.getDataSize(),
+                            medium, null, false) == BalanceStatus.OK);
+                    if (!isFit) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("tablet {} with size {} medium {} not 
fit in low backends",
+                                    tabletId, replica.getDataSize(), medium);
+                        }
+                        continue;
+                    }
+
                     TabletSchedCtx tabletCtx = new 
TabletSchedCtx(TabletSchedCtx.Type.BALANCE,
                             tabletMeta.getDbId(), tabletMeta.getTableId(), 
tabletMeta.getPartitionId(),
                             tabletMeta.getIndexId(), tabletId, null /* replica 
alloc is not used for balance*/,
                             System.currentTimeMillis());
                     tabletCtx.setTag(clusterStat.getTag());
                     // balance task's priority is always LOW
-                    tabletCtx.setPriority(Priority.LOW);
+                    tabletCtx.setPriority(isUrgent ? Priority.NORMAL : 
Priority.LOW);
 
                     alternativeTablets.add(tabletCtx);
+                    alternativeTabletInfos.add("{ tabletId=" + tabletId + ", 
beId=" + beStat.getBeId()
+                            + ", pathHash=" + replica.getPathHash()
+                            + ", replicaLocalSize=" + replica.getDataSize() + 
" }");
                     if (--numOfLowPaths <= 0) {
                         // enough
                         break OUTER;
@@ -217,13 +279,13 @@ public class BeLoadRebalancer extends Rebalancer {
         } // end for high backends
 
         if (!alternativeTablets.isEmpty()) {
-            LOG.info("select alternative tablets, medium: {}, num: {}, detail: 
{}",
-                    medium, alternativeTablets.size(),
-                    
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
+            LOG.info("select alternative tablets, medium: {}, is urgent: {}, 
num: {}, detail: {}",
+                    medium, isUrgent, alternativeTablets.size(), 
alternativeTabletInfos);
         }
         return alternativeTablets;
     }
 
+
     /*
      * Create a clone task of this selected tablet for balance.
      * 1. Check if this tablet has replica on high load backend. If not, the 
balance will be cancelled.
@@ -239,17 +301,17 @@ public class BeLoadRebalancer extends Rebalancer {
         }
 
         // get classification of backends
-        List<BackendLoadStatistic> lowBe = Lists.newArrayList();
-        List<BackendLoadStatistic> midBe = Lists.newArrayList();
-        List<BackendLoadStatistic> highBe = Lists.newArrayList();
-        clusterStat.getBackendStatisticByClass(lowBe, midBe, highBe, 
tabletCtx.getStorageMedium());
+        List<BackendLoadStatistic> lowBEs = Lists.newArrayList();
+        List<BackendLoadStatistic> highBEs = Lists.newArrayList();
+        boolean isUrgent = clusterStat.getLowHighBEsWithIsUrgent(lowBEs, 
highBEs, tabletCtx.getStorageMedium());
+        String isUrgentInfo = isUrgent ? " for urgent" : " for non-urgent";
 
-        if (lowBe.isEmpty() && highBe.isEmpty()) {
+        if (lowBEs.isEmpty() && highBEs.isEmpty()) {
             throw new SchedException(Status.UNRECOVERABLE, 
SubCode.DIAGNOSE_IGNORE, "cluster is balance");
         }
 
         // if all low backends is not available, return
-        if (lowBe.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
+        if (lowBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
             throw new SchedException(Status.UNRECOVERABLE, "all low load 
backends is unavailable");
         }
 
@@ -258,21 +320,21 @@ public class BeLoadRebalancer extends Rebalancer {
         // Check if this tablet has replica on high load backend.
         // Also create a set to save hosts of this tablet.
         Set<String> hosts = Sets.newHashSet();
-        boolean hasHighReplica = false;
-        for (Replica replica : replicas) {
-            if (highBe.stream().anyMatch(b -> b.getBeId() == 
replica.getBackendId())) {
-                hasHighReplica = true;
+        List<BackendLoadStatistic> replicaHighBEs = Lists.newArrayList();
+        for (BackendLoadStatistic beStat : highBEs) {
+            if (replicas.stream().anyMatch(replica -> beStat.getBeId() == 
replica.getBackendId())) {
+                replicaHighBEs.add(beStat);
             }
-            Backend be = infoService.getBackend(replica.getBackendId());
+            Backend be = infoService.getBackend(beStat.getBeId());
             if (be == null) {
                 throw new SchedException(Status.UNRECOVERABLE, 
SubCode.DIAGNOSE_IGNORE,
-                        "backend is dropped: " + replica.getBackendId());
+                        "backend is dropped: " + beStat.getBeId());
             }
             hosts.add(be.getHost());
         }
-        if (!hasHighReplica) {
+        if (replicaHighBEs.isEmpty()) {
             throw new SchedException(Status.UNRECOVERABLE, 
SubCode.DIAGNOSE_IGNORE,
-                    "no replica on high load backend");
+                    "no replica on high load backend" + isUrgentInfo);
         }
 
         // select a replica as source
@@ -290,12 +352,12 @@ public class BeLoadRebalancer extends Rebalancer {
             }
         }
         if (!setSource) {
-            throw new SchedException(Status.UNRECOVERABLE, 
SubCode.DIAGNOSE_IGNORE, "unable to take src backend slot");
+            throw new SchedException(Status.UNRECOVERABLE, "unable to take src 
slot" + isUrgentInfo);
         }
 
         // Select a low load backend as destination.
         List<BackendLoadStatistic> candidates = Lists.newArrayList();
-        for (BackendLoadStatistic beStat : lowBe) {
+        for (BackendLoadStatistic beStat : lowBEs) {
             if (beStat.isAvailable() && replicas.stream().noneMatch(r -> 
r.getBackendId() == beStat.getBeId())) {
                 // check if on same host.
                 Backend lowBackend = infoService.getBackend(beStat.getBeId());
@@ -308,18 +370,22 @@ public class BeLoadRebalancer extends Rebalancer {
 
                 // no replica on this low load backend
                 // 1. check if this clone task can make the cluster more 
balance.
-                List<RootPathLoadStatistic> availPaths = Lists.newArrayList();
-                BalanceStatus bs;
-                if ((bs = beStat.isFit(tabletCtx.getTabletSize(), 
tabletCtx.getStorageMedium(), availPaths,
-                        false /* not supplement */)) != BalanceStatus.OK) {
-                    LOG.debug("tablet not fit in BE {}, reason: {}", 
beStat.getBeId(), bs.getErrMsgs());
+                BalanceStatus bs = beStat.isFit(tabletCtx.getTabletSize(), 
tabletCtx.getStorageMedium(), null,
+                        false /* not supplement */);
+                if (bs != BalanceStatus.OK) {
+                    LOG.debug("tablet not fit in BE {}, reason: {}, {}",
+                            beStat.getBeId(), bs.getErrMsgs(), isUrgentInfo);
                     continue;
                 }
 
-                if (!Config.be_rebalancer_fuzzy_test && 
!clusterStat.isMoreBalanced(
-                        tabletCtx.getSrcBackendId(), beStat.getBeId(), 
tabletCtx.getTabletId(),
-                        tabletCtx.getTabletSize(), 
tabletCtx.getStorageMedium())) {
-                    continue;
+                if (!Config.be_rebalancer_fuzzy_test && !isUrgent) {
+                    boolean moreBalanced = 
replicaHighBEs.stream().anyMatch(highBeStat ->
+                            clusterStat.isMoreBalanced(highBeStat.getBeId(), 
beStat.getBeId(),
+                            tabletCtx.getTabletId(), tabletCtx.getTabletSize(),
+                            tabletCtx.getStorageMedium()));
+                    if (!moreBalanced) {
+                        continue;
+                    }
                 }
 
                 PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
@@ -333,7 +399,8 @@ public class BeLoadRebalancer extends Rebalancer {
         }
 
         if (candidates.isEmpty()) {
-            throw new SchedException(Status.UNRECOVERABLE, 
SubCode.DIAGNOSE_IGNORE, "unable to find low dest backend");
+            throw new SchedException(Status.UNRECOVERABLE, 
SubCode.DIAGNOSE_IGNORE,
+                    "unable to find low dest backend" + isUrgentInfo);
         }
 
         List<BePathLoadStatPair> candFitPaths = Lists.newArrayList();
@@ -343,15 +410,27 @@ public class BeLoadRebalancer extends Rebalancer {
                 continue;
             }
 
-            // classify the paths.
-            // And we only select path from 'low' and 'mid' paths
-            List<RootPathLoadStatistic> pathLow = Lists.newArrayList();
-            List<RootPathLoadStatistic> pathMid = Lists.newArrayList();
-            List<RootPathLoadStatistic> pathHigh = Lists.newArrayList();
-            beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, 
tabletCtx.getStorageMedium());
+            List<RootPathLoadStatistic> pathLow = null;
+            if (isUrgent) {
+                pathLow = 
beStat.getAvailPaths(tabletCtx.getStorageMedium()).stream()
+                        .filter(RootPathLoadStatistic::isGlobalLowUsage)
+                        .collect(Collectors.toList());
+            } else {
+                // classify the paths.
+                // And we only select path from 'low' and 'mid' paths
+                pathLow = Lists.newArrayList();
+                List<RootPathLoadStatistic> pathMid = Lists.newArrayList();
+                List<RootPathLoadStatistic> pathHigh = Lists.newArrayList();
+                beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, 
tabletCtx.getStorageMedium());
+
+                pathLow.addAll(pathMid);
+            }
+            pathLow.forEach(path -> candFitPaths.add(new 
BePathLoadStatPair(beStat, path)));
+        }
 
-            pathLow.addAll(pathMid);
-            pathLow.stream().forEach(path -> candFitPaths.add(new 
BePathLoadStatPair(beStat, path)));
+        if (candFitPaths.isEmpty()) {
+            throw new SchedException(Status.UNRECOVERABLE, 
SubCode.DIAGNOSE_IGNORE,
+                    "unable to find low dest backend to fit in paths" + 
isUrgentInfo);
         }
 
         BePathLoadStatPairComparator comparator = new 
BePathLoadStatPairComparator(candFitPaths);
@@ -359,6 +438,7 @@ public class BeLoadRebalancer extends Rebalancer {
         for (BePathLoadStatPair bePathLoadStat : candFitPaths) {
             BackendLoadStatistic beStat = 
bePathLoadStat.getBackendLoadStatistic();
             RootPathLoadStatistic pathStat = 
bePathLoadStat.getPathLoadStatistic();
+
             PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
             if (slot == null) {
                 continue;
@@ -370,7 +450,7 @@ public class BeLoadRebalancer extends Rebalancer {
         }
 
         throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
-                "beload waiting for dest backend slot");
+                "unable to take dest slot" + isUrgentInfo);
     }
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
index 158f2cde4af..faf9704a902 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.clone.BackendLoadStatistic.Classification;
 import org.apache.doris.clone.BackendLoadStatistic.LoadScore;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.resource.Tag;
@@ -38,6 +39,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 /*
@@ -144,6 +146,7 @@ public class LoadStatisticForTag {
         // classify all backends
         for (TStorageMedium medium : TStorageMedium.values()) {
             classifyBackendByLoad(medium);
+            classifyBackendByMaxDiskUsage(medium);
         }
 
         // sort be stats by mix load score
@@ -245,6 +248,84 @@ public class LoadStatisticForTag {
                 medium, avgLoadScore, lowCounter, midCounter, highCounter);
     }
 
+    private void classifyBackendByMaxDiskUsage(TStorageMedium medium) {
+        calcDiskGlobalUsages(medium);
+        Classification[] clazzs = { Classification.HIGH, Classification.LOW, 
Classification.MID };
+        for (BackendLoadStatistic beStat : beLoadStatistics) {
+            if (!beStat.hasMedium(medium)) {
+                continue;
+            }
+            for (Classification clazz : clazzs) {
+                if (beStat.hasAvailPathWithGlobalClazz(medium, clazz)) {
+                    beStat.setMaxDiskClazz(medium, clazz);
+                    break;
+                }
+            }
+        }
+    }
+
+    private void calcDiskGlobalUsages(TStorageMedium medium) {
+        double urgentDiffUsageThreshold;
+        if (Config.be_rebalancer_fuzzy_test) {
+            urgentDiffUsageThreshold = 0;
+        } else {
+            urgentDiffUsageThreshold = Config.balance_load_score_threshold
+                    + Config.urgent_balance_disk_usage_extra_threshold;
+            if (urgentDiffUsageThreshold <= 0) {
+                return;
+            }
+        }
+
+        double totalDiskUsages = 0;
+        int totalDiskNum = 0;
+        for (BackendLoadStatistic beStat : getBackendLoadStatistics()) {
+            if (!beStat.isAvailable()) {
+                continue;
+            }
+            for (RootPathLoadStatistic pathStat : 
beStat.getAvailPaths(medium)) {
+                if (pathStat.getCapacityB() > 1L) {
+                    totalDiskUsages += pathStat.getUsedPercent();
+                    totalDiskNum++;
+                }
+            }
+        }
+
+        if (totalDiskNum == 0) {
+            return;
+        }
+
+        double avgDiskUsage = totalDiskUsages / totalDiskNum;
+        double urgentDiskUsage = avgDiskUsage + urgentDiffUsageThreshold;
+
+        boolean hasHighDisk = false;
+        for (BackendLoadStatistic beStat : getBackendLoadStatistics()) {
+            if (!beStat.isAvailable()) {
+                continue;
+            }
+            for (RootPathLoadStatistic pathStat : 
beStat.getAvailPaths(medium)) {
+                if (pathStat.getCapacityB() > 1L) {
+                    double usage = pathStat.getUsedPercent();
+                    if (usage > urgentDiskUsage) {
+                        pathStat.setGlobalClazz(Classification.HIGH);
+                        hasHighDisk = true;
+                    } else if (usage > avgDiskUsage) {
+                        pathStat.setGlobalClazz(Classification.MID);
+                    } else {
+                        pathStat.setGlobalClazz(Classification.LOW);
+                    }
+                }
+            }
+        }
+
+        if (!hasHighDisk) {
+            for (BackendLoadStatistic beStat : getBackendLoadStatistics()) {
+                for (RootPathLoadStatistic pathStat : 
beStat.getAvailPaths(medium)) {
+                    pathStat.setGlobalClazz(Classification.MID);
+                }
+            }
+        }
+    }
+
     private static void sortBeStats(List<BackendLoadStatistic> beStats, 
TStorageMedium medium) {
         if (medium == null) {
             Collections.sort(beStats, BackendLoadStatistic.MIX_COMPARATOR);
@@ -353,7 +434,8 @@ public class LoadStatisticForTag {
                 pathStat.add(String.valueOf(pathStatistic.getCapacityB()));
                 
pathStat.add(String.valueOf(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(
                         pathStatistic.getUsedCapacityB() * 100 / (double) 
pathStatistic.getCapacityB())));
-                pathStat.add(pathStatistic.getClazz().name());
+                pathStat.add(pathStatistic.getLocalClazz().name());
+                pathStat.add(pathStatistic.getGlobalClazz().name());
                 pathStat.add(pathStatistic.getDiskState().name());
                 statistics.add(pathStat);
             }
@@ -375,6 +457,88 @@ public class LoadStatisticForTag {
         return beLoadStatistics;
     }
 
+    public boolean getLowHighBEsWithIsUrgent(List<BackendLoadStatistic> 
lowBEs, List<BackendLoadStatistic> highBEs,
+            TStorageMedium medium) {
+        if (getUrgentLowHighBEs(lowBEs, highBEs, medium)) {
+            return true;
+        } else {
+            lowBEs.clear();
+            highBEs.clear();
+            List<BackendLoadStatistic> midBEs = Lists.newArrayList();
+            getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium);
+            return false;
+        }
+    }
+
+    private boolean getUrgentLowHighBEs(List<BackendLoadStatistic> lowBEs, 
List<BackendLoadStatistic> highBEs,
+            TStorageMedium medium) {
+        List<BackendLoadStatistic> midBEs = Lists.newArrayList();
+        for (BackendLoadStatistic beStat : getBackendLoadStatistics()) {
+            if (!beStat.isAvailable()) {
+                continue;
+            }
+            switch (beStat.getMaxDiskClazz(medium)) {
+                case LOW:
+                    lowBEs.add(beStat);
+                    break;
+                case MID:
+                    midBEs.add(beStat);
+                    break;
+                case HIGH:
+                    highBEs.add(beStat);
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        if (lowBEs.isEmpty()) {
+            lowBEs.addAll(midBEs);
+        }
+
+        if (lowBEs.isEmpty() && highBEs.size() > 1 && 
Config.enable_urgent_balance_no_low_backend) {
+            // all backend will exchange tablets among themselves.
+            lowBEs.addAll(highBEs);
+        }
+
+        if (lowBEs.isEmpty() || highBEs.isEmpty()) {
+            lowBEs.clear();
+            highBEs.clear();
+            return false;
+        }
+
+        BiConsumer<List<BackendLoadStatistic>, Boolean> resortBeStats = 
(beStats, choseMinPathElseMaxPath) -> {
+            List<Pair<BackendLoadStatistic, Double>> bePairs = 
Lists.newArrayList();
+            for (BackendLoadStatistic beStat : beStats) {
+                double score = -1.0;
+                for (RootPathLoadStatistic pathStat : 
beStat.getAvailPaths(medium)) {
+                    if (pathStat.getCapacityB() > 1) {
+                        double usage = pathStat.getUsedPercent();
+                        if (score < 0 || (choseMinPathElseMaxPath && usage < 
score)
+                                || (!choseMinPathElseMaxPath && usage > 
score)) {
+                            score = usage;
+                        }
+                    }
+                }
+                bePairs.add(Pair.of(beStat, score));
+            }
+            Collections.sort(bePairs, new 
Pair.PairComparator<Pair<BackendLoadStatistic, Double>>());
+
+            beStats.clear();
+            bePairs.forEach(pair -> beStats.add(pair.key()));
+        };
+
+        resortBeStats.accept(lowBEs, true);
+        resortBeStats.accept(highBEs, false);
+
+        LOG.debug("urgent backends' classification lowBe {}, highBe {}, 
medium: {}",
+                
lowBEs.stream().map(BackendLoadStatistic::getBeId).collect(Collectors.toList()),
+                
highBEs.stream().map(BackendLoadStatistic::getBeId).collect(Collectors.toList()),
+                medium);
+
+        return true;
+    }
+
     /*
      * If cluster is balance, all Backends will be in 'mid', and 'high' and 
'low' is empty
      * If both 'high' and 'low' has Backends, just return
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
index d2f1983a831..3e94bebcfef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
@@ -34,7 +34,11 @@ public class RootPathLoadStatistic implements 
Comparable<RootPathLoadStatistic>
     private long copingSizeB;
     private DiskState diskState;
 
-    private Classification clazz = Classification.INIT;
+    // localClazz is compare with other disks on the same backend
+    private Classification localClazz = Classification.INIT;
+
+    // globalClazz is compare with other disks on all backends
+    private Classification globalClazz = Classification.INIT;
 
     public RootPathLoadStatistic(long beId, String path, Long pathHash, 
TStorageMedium storageMedium,
             long capacityB, long usedCapacityB, DiskState diskState) {
@@ -80,12 +84,28 @@ public class RootPathLoadStatistic implements 
Comparable<RootPathLoadStatistic>
         copingSizeB += size;
     }
 
-    public void setClazz(Classification clazz) {
-        this.clazz = clazz;
+    public void setLocalClazz(Classification clazz) {
+        this.localClazz = clazz;
+    }
+
+    public Classification getLocalClazz() {
+        return localClazz;
+    }
+
+    public void setGlobalClazz(Classification clazz) {
+        this.globalClazz = clazz;
+    }
+
+    public Classification getGlobalClazz() {
+        return globalClazz;
+    }
+
+    public boolean isGlobalHighUsage() {
+        return globalClazz == Classification.HIGH;
     }
 
-    public Classification getClazz() {
-        return clazz;
+    public boolean isGlobalLowUsage() {
+        return globalClazz == Classification.LOW;
     }
 
     public DiskState getDiskState() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 3f09a65b45e..4cab2fd8bee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -842,6 +842,7 @@ public class TabletScheduler extends MasterDaemon {
                 || deleteReplicaOnSameHost(tabletCtx, force)
                 || deleteReplicaNotInValidTag(tabletCtx, force)
                 || deleteReplicaChosenByRebalancer(tabletCtx, force)
+                || deleteReplicaOnUrgentHighDisk(tabletCtx, force)
                 || deleteReplicaOnHighLoadBackend(tabletCtx, force)) {
             // if we delete at least one redundant replica, we still throw a 
SchedException with status FINISHED
             // to remove this tablet from the pendingTablets(consider it as 
finished)
@@ -990,6 +991,34 @@ public class TabletScheduler extends MasterDaemon {
         return true;
     }
 
+    private boolean deleteReplicaOnUrgentHighDisk(TabletSchedCtx tabletCtx, 
boolean force) throws SchedException {
+        Tag tag = chooseProperTag(tabletCtx, false);
+        LoadStatisticForTag statistic = statisticMap.get(tag);
+        if (statistic == null) {
+            return false;
+        }
+
+        Replica chosenReplica = null;
+        double maxUsages = -1;
+        for (Replica replica : tabletCtx.getReplicas()) {
+            BackendLoadStatistic beStatistic = 
statistic.getBackendLoadStatistic(replica.getBackendId());
+            if (beStatistic == null) {
+                continue;
+            }
+            RootPathLoadStatistic path = 
beStatistic.getPathStatisticByPathHash(replica.getPathHash());
+            if (path != null && path.isGlobalHighUsage() && 
path.getUsedPercent() > maxUsages) {
+                maxUsages = path.getUsedPercent();
+                chosenReplica = replica;
+            }
+        }
+
+        if (chosenReplica != null) {
+            deleteReplicaInternal(tabletCtx, chosenReplica, "high usage disk", 
force);
+            return true;
+        }
+        return false;
+    }
+
     private boolean deleteReplicaOnHighLoadBackend(TabletSchedCtx tabletCtx, 
boolean force) throws SchedException {
         Tag tag = chooseProperTag(tabletCtx, false);
         LoadStatisticForTag statistic = statisticMap.get(tag);
@@ -1037,7 +1066,7 @@ public class TabletScheduler extends MasterDaemon {
         }
 
         if (chosenReplica != null) {
-            deleteReplicaInternal(tabletCtx, chosenReplica, "high load", 
force);
+            deleteReplicaInternal(tabletCtx, chosenReplica, "high load 
backend", force);
             return true;
         }
         return false;
@@ -1251,48 +1280,71 @@ public class TabletScheduler extends MasterDaemon {
             return;
         }
 
-        // No need to prefetch too many balance task to pending queue.
-        // Because for every sched, it will re select the balance task.
-        int needAddBalanceNum = Math.min(Config.schedule_batch_size - 
getPendingNum(),
-                    Config.max_balancing_tablets - getBalanceTabletsNumber());
-        if (needAddBalanceNum <= 0) {
-            return;
+        // TODO: too ugly, remove balance_be_then_disk later.
+        if (Config.balance_be_then_disk) {
+            boolean hasBeBalance = selectTabletsForBeBalance();
+            selectTabletsForDiskBalance(hasBeBalance);
+        } else {
+            selectTabletsForDiskBalance(false);
+            selectTabletsForBeBalance();
+        }
+    }
+
+    private boolean selectTabletsForBeBalance() {
+        int limit = getBalanceSchedQuotoLeft();
+        if (limit <= 0) {
+            return false;
         }
 
+        int addNum = 0;
         List<TabletSchedCtx> alternativeTablets = 
rebalancer.selectAlternativeTablets();
         Collections.shuffle(alternativeTablets);
         for (TabletSchedCtx tabletCtx : alternativeTablets) {
-            if (needAddBalanceNum > 0 && addTablet(tabletCtx, false) == 
AddResult.ADDED) {
-                needAddBalanceNum--;
+            if (addNum >= limit) {
+                break;
+            }
+            if (addTablet(tabletCtx, false) == AddResult.ADDED) {
+                addNum++;
             } else {
                 rebalancer.onTabletFailed(tabletCtx);
             }
         }
-        if (needAddBalanceNum <= 0) {
-            return;
-        }
+        return addNum > 0;
+    }
+
+    private void selectTabletsForDiskBalance(boolean hasBeBalance) {
         if (Config.disable_disk_balance) {
             LOG.info("disk balance is disabled. skip selecting tablets for 
disk balance");
             return;
         }
-        List<TabletSchedCtx> diskBalanceTablets = Lists.newArrayList();
-        // if default rebalancer can not get new task or user given prio BEs, 
then use disk rebalancer to get task
-        if (diskRebalancer.hasPrioBackends() || alternativeTablets.isEmpty()) {
-            diskBalanceTablets = diskRebalancer.selectAlternativeTablets();
+
+        int limit = getBalanceSchedQuotoLeft();
+        if (limit <= 0) {
+            return;
         }
-        for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
+
+        int addNum = 0;
+        for (TabletSchedCtx tabletCtx : 
diskRebalancer.selectAlternativeTablets()) {
+            if (addNum >= limit) {
+                break;
+            }
             // add if task from prio backend or cluster is balanced
-            if (alternativeTablets.isEmpty() || tabletCtx.getPriority() == 
TabletSchedCtx.Priority.NORMAL) {
+            if (!hasBeBalance || Config.be_rebalancer_idle_seconds <= 0
+                    || tabletCtx.getPriority() == 
TabletSchedCtx.Priority.NORMAL) {
                 if (addTablet(tabletCtx, false) == AddResult.ADDED) {
-                    needAddBalanceNum--;
-                    if (needAddBalanceNum <= 0) {
-                        break;
-                    }
+                    addNum++;
                 }
             }
         }
     }
 
+    private int getBalanceSchedQuotoLeft() {
+        // No need to prefetch too many balance task to pending queue.
+        // Because for every sched, it will re select the balance task.
+        return Math.min(Config.schedule_batch_size - getPendingNum(),
+                Config.max_balancing_tablets - getBalanceTabletsNumber());
+    }
+
     /**
      * Try to create a balance task for a tablet.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java
index 12bf2aba169..1bec8a5d7a0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java
@@ -27,7 +27,7 @@ public class BackendLoadStatisticProcNode implements 
ProcNodeInterface {
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
             .add("RootPath").add("PathHash").add("StorageMedium")
             .add("DataUsedCapacity").add("TotalCapacity").add("TotalUsedPct")
-            .add("Class").add("State")
+            .add("ClassInOneBE").add("ClassInAllBE").add("State")
             .build();
 
     private final LoadStatisticForTag statistic;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java
index 29eafabf158..1c623effc10 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java
@@ -29,7 +29,7 @@ import com.google.common.collect.ImmutableList;
 // show proc "/cluster_balance/cluster_load_stat/location_default/HDD";
 public class ClusterLoadStatisticProcDir implements ProcDirInterface {
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            .add("BeId").add("Available").add("UsedCapacity").add("Capacity")
+            
.add("BeId").add("Available").add("UsedCapacity").add("Capacity").add("MaxDisk")
             
.add("UsedPercent").add("ReplicaNum").add("CapCoeff").add("ReplCoeff").add("Score")
             .add("Class")
             .build();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
index 81fdaafc969..05abfacdce0 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.clone.BackendLoadStatistic.Classification;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -51,24 +52,28 @@ public class ClusterLoadStatisticsTest {
     @Before
     public void setUp() {
         // be1
+        // 50%, 95%, 2%
         be1 = new Backend(10001, "192.168.0.1", 9051);
         Map<String, DiskInfo> disks = Maps.newHashMap();
         DiskInfo diskInfo1 = new DiskInfo("/path1");
-        diskInfo1.setTotalCapacityB(1000000);
-        diskInfo1.setAvailableCapacityB(500000);
-        diskInfo1.setDataUsedCapacityB(480000);
+        diskInfo1.setTotalCapacityB(1_000_000);
+        diskInfo1.setAvailableCapacityB(500_000);
+        diskInfo1.setDataUsedCapacityB(480_000);
+        diskInfo1.setPathHash(1001);
         disks.put(diskInfo1.getRootPath(), diskInfo1);
 
         DiskInfo diskInfo2 = new DiskInfo("/path2");
-        diskInfo2.setTotalCapacityB(2000000);
-        diskInfo2.setAvailableCapacityB(100000);
-        diskInfo2.setDataUsedCapacityB(80000);
+        diskInfo2.setTotalCapacityB(2_000_000);
+        diskInfo2.setAvailableCapacityB(100_000);
+        diskInfo2.setDataUsedCapacityB(80_000);
+        diskInfo2.setPathHash(1002);
         disks.put(diskInfo2.getRootPath(), diskInfo2);
 
         DiskInfo diskInfo3 = new DiskInfo("/path3");
-        diskInfo3.setTotalCapacityB(500000);
-        diskInfo3.setAvailableCapacityB(490000);
-        diskInfo3.setDataUsedCapacityB(10000);
+        diskInfo3.setTotalCapacityB(500_000);
+        diskInfo3.setAvailableCapacityB(490_000);
+        diskInfo3.setDataUsedCapacityB(10_000);
+        diskInfo3.setPathHash(1003);
         disks.put(diskInfo3.getRootPath(), diskInfo3);
 
         be1.setDisks(ImmutableMap.copyOf(disks));
@@ -78,15 +83,17 @@ public class ClusterLoadStatisticsTest {
         be2 = new Backend(10002, "192.168.0.2", 9052);
         disks = Maps.newHashMap();
         diskInfo1 = new DiskInfo("/path1");
-        diskInfo1.setTotalCapacityB(2000000);
-        diskInfo1.setAvailableCapacityB(1900000);
-        diskInfo1.setDataUsedCapacityB(480000);
+        diskInfo1.setTotalCapacityB(2_000_000);
+        diskInfo1.setAvailableCapacityB(1_900_000);
+        diskInfo1.setDataUsedCapacityB(480_000);
+        diskInfo1.setPathHash(2001);
         disks.put(diskInfo1.getRootPath(), diskInfo1);
 
         diskInfo2 = new DiskInfo("/path2");
-        diskInfo2.setTotalCapacityB(20000000);
-        diskInfo2.setAvailableCapacityB(1000000);
-        diskInfo2.setDataUsedCapacityB(80000);
+        diskInfo2.setTotalCapacityB(20_000_000);
+        diskInfo2.setAvailableCapacityB(1_000_000);
+        diskInfo2.setDataUsedCapacityB(80_000);
+        diskInfo2.setPathHash(2002);
         disks.put(diskInfo2.getRootPath(), diskInfo2);
 
         be2.setDisks(ImmutableMap.copyOf(disks));
@@ -96,21 +103,24 @@ public class ClusterLoadStatisticsTest {
         be3 = new Backend(10003, "192.168.0.3", 9053);
         disks = Maps.newHashMap();
         diskInfo1 = new DiskInfo("/path1");
-        diskInfo1.setTotalCapacityB(4000000);
-        diskInfo1.setAvailableCapacityB(100000);
-        diskInfo1.setDataUsedCapacityB(80000);
+        diskInfo1.setTotalCapacityB(4_000_000);
+        diskInfo1.setAvailableCapacityB(100_000);
+        diskInfo1.setDataUsedCapacityB(80_000);
+        diskInfo1.setPathHash(3001);
         disks.put(diskInfo1.getRootPath(), diskInfo1);
 
         diskInfo2 = new DiskInfo("/path2");
-        diskInfo2.setTotalCapacityB(2000000);
-        diskInfo2.setAvailableCapacityB(100000);
-        diskInfo2.setDataUsedCapacityB(80000);
+        diskInfo2.setTotalCapacityB(2_000_000);
+        diskInfo2.setAvailableCapacityB(100_000);
+        diskInfo2.setDataUsedCapacityB(80_000);
+        diskInfo2.setPathHash(3002);
         disks.put(diskInfo2.getRootPath(), diskInfo2);
 
         diskInfo3 = new DiskInfo("/path3");
-        diskInfo3.setTotalCapacityB(500000);
-        diskInfo3.setAvailableCapacityB(490000);
-        diskInfo3.setDataUsedCapacityB(10000);
+        diskInfo3.setTotalCapacityB(500_000);
+        diskInfo3.setAvailableCapacityB(490_000);
+        diskInfo3.setDataUsedCapacityB(10_000);
+        diskInfo3.setPathHash(3003);
         disks.put(diskInfo3.getRootPath(), diskInfo3);
 
         be3.setDisks(ImmutableMap.copyOf(disks));
@@ -120,9 +130,9 @@ public class ClusterLoadStatisticsTest {
         be4 = new Backend(10004, "192.168.0.4", 9053);
         disks = Maps.newHashMap();
         diskInfo1 = new DiskInfo("/path1");
-        diskInfo1.setTotalCapacityB(4000000);
-        diskInfo1.setAvailableCapacityB(100000);
-        diskInfo1.setDataUsedCapacityB(80000);
+        diskInfo1.setTotalCapacityB(4_000_000);
+        diskInfo1.setAvailableCapacityB(100_000);
+        diskInfo1.setDataUsedCapacityB(80_000);
         disks.put(diskInfo1.getRootPath(), diskInfo1);
 
         be4.setDisks(ImmutableMap.copyOf(disks));
@@ -161,6 +171,14 @@ public class ClusterLoadStatisticsTest {
         loadStatistic.init();
         List<List<String>> infos = 
loadStatistic.getStatistic(TStorageMedium.HDD);
         Assert.assertEquals(3, infos.size());
+        BackendLoadStatistic beStat1 = 
loadStatistic.getBackendLoadStatistic(be1.getId());
+        Assert.assertNotNull(beStat1);
+        RootPathLoadStatistic path2 = beStat1.getPathStatisticByPathHash(1002);
+        RootPathLoadStatistic path3 = beStat1.getPathStatisticByPathHash(1003);
+        Assert.assertEquals(Classification.HIGH, path2.getLocalClazz());
+        Assert.assertEquals(Classification.HIGH, path2.getGlobalClazz());
+        Assert.assertEquals(Classification.LOW, path3.getLocalClazz());
+        Assert.assertEquals(Classification.LOW, path3.getGlobalClazz());
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to