This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d97be13ba7624d052073a0924c478f2dedcc353b
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Thu Aug 17 22:30:49 2023 +0800

    [improvement](tablet clone) improve tablet balance, scaling speed etc  
(#22317)
---
 .../main/java/org/apache/doris/common/Config.java  |  10 +-
 .../apache/doris/clone/BackendLoadStatistic.java   | 112 +++++++++-
 .../org/apache/doris/clone/BeLoadRebalancer.java   |  56 +++--
 .../org/apache/doris/clone/DiskRebalancer.java     |  20 +-
 .../apache/doris/clone/LoadStatisticForTag.java    |   4 +
 .../apache/doris/clone/PartitionRebalancer.java    |  12 +-
 .../java/org/apache/doris/clone/Rebalancer.java    |  11 +-
 .../apache/doris/clone/RootPathLoadStatistic.java  |   8 +-
 .../org/apache/doris/clone/TabletSchedCtx.java     |  70 +++++--
 .../org/apache/doris/clone/TabletScheduler.java    | 228 ++++++++++++++-------
 .../java/org/apache/doris/common/FeConstants.java  |   2 +
 .../common/proc/TabletSchedulerDetailProcDir.java  |   2 +-
 .../main/java/org/apache/doris/task/CloneTask.java |  19 +-
 .../org/apache/doris/clone/DecommissionTest.java   | 174 ++++++++++++++++
 .../org/apache/doris/clone/DiskRebalanceTest.java  |  36 ++--
 .../java/org/apache/doris/clone/RebalanceTest.java |   6 +-
 .../org/apache/doris/clone/RebalancerTestUtil.java |  23 +++
 .../doris/clone/RootPathLoadStatisticTest.java     |  14 +-
 .../doris/clone/TabletRepairAndBalanceTest.java    |  27 +--
 .../java/org/apache/doris/task/AgentTaskTest.java  |   3 +-
 20 files changed, 638 insertions(+), 199 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 3578c38726..7fd3695398 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -934,17 +934,19 @@ public class Config extends ConfigBase {
     public static long tablet_repair_delay_factor_second = 60;
 
     /**
-     * the default slot number per path in tablet scheduler
+     * the default slot number per path for hdd in tablet scheduler
      * TODO(cmy): remove this config and dynamically adjust it by clone task 
statistic
      */
     @ConfField(mutable = true, masterOnly = true)
-    public static int schedule_slot_num_per_path = 4;
+    public static int schedule_slot_num_per_hdd_path = 4;
+
 
     /**
-     * the default slot number per path in tablet scheduler for decommission 
backend
+     * the default slot number per path for ssd in tablet scheduler
+     * TODO(cmy): remove this config and dynamically adjust it by clone task 
statistic
      */
     @ConfField(mutable = true, masterOnly = true)
-    public static int schedule_decommission_slot_num_per_path = 8;
+    public static int schedule_slot_num_per_ssd_path = 8;
 
     /**
      * the default batch size in tablet scheduler for a single schedule.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
index fdf45afe81..6263f53ebe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
@@ -68,6 +68,70 @@ public class BackendLoadStatistic {
         }
     }
 
+    public static class BePathLoadStatPair {
+        private BackendLoadStatistic beLoadStatistic;
+        private RootPathLoadStatistic pathLoadStatistic;
+
+        BePathLoadStatPair(BackendLoadStatistic beLoadStatistic, 
RootPathLoadStatistic pathLoadStatistic) {
+            this.beLoadStatistic = beLoadStatistic;
+            this.pathLoadStatistic = pathLoadStatistic;
+        }
+
+        BackendLoadStatistic getBackendLoadStatistic() {
+            return beLoadStatistic;
+        }
+
+        RootPathLoadStatistic getPathLoadStatistic() {
+            return pathLoadStatistic;
+        }
+
+        @Override
+        public String toString() {
+            return "{ beId: " + beLoadStatistic.getBeId() + ", be score: "
+                    + 
beLoadStatistic.getLoadScore(pathLoadStatistic.getStorageMedium())
+                    + ", path: " + pathLoadStatistic.getPath()
+                    + ", path used percent: " + 
pathLoadStatistic.getUsedPercent()
+                    + " }";
+        }
+    }
+
+    public static class BePathLoadStatPairComparator implements 
Comparator<BePathLoadStatPair> {
+        private double avgBackendLoadScore;
+        private double avgPathUsedPercent;
+
+        BePathLoadStatPairComparator(List<BePathLoadStatPair> loadStats) {
+            avgBackendLoadScore = 0.0;
+            avgPathUsedPercent = 0.0;
+            for (BePathLoadStatPair loadStat : loadStats) {
+                RootPathLoadStatistic pathStat = 
loadStat.getPathLoadStatistic();
+                avgBackendLoadScore += 
loadStat.getBackendLoadStatistic().getLoadScore(pathStat.getStorageMedium());
+                avgPathUsedPercent += pathStat.getUsedPercent();
+            }
+            if (!loadStats.isEmpty()) {
+                avgPathUsedPercent /= loadStats.size();
+                avgBackendLoadScore /= loadStats.size();
+            }
+            if (avgBackendLoadScore == 0.0) {
+                avgBackendLoadScore = 1.0;
+            }
+            if (avgPathUsedPercent == 0.0) {
+                avgPathUsedPercent = 1.0;
+            }
+        }
+
+        @Override
+        public int compare(BePathLoadStatPair o1, BePathLoadStatPair o2) {
+            return Double.compare(getCompareValue(o1), getCompareValue(o2));
+        }
+
+        private double getCompareValue(BePathLoadStatPair loadStat) {
+            BackendLoadStatistic beStat = loadStat.getBackendLoadStatistic();
+            RootPathLoadStatistic pathStat = loadStat.getPathLoadStatistic();
+            return 0.5 * beStat.getLoadScore(pathStat.getStorageMedium()) / 
avgBackendLoadScore
+                    + 0.5 * pathStat.getUsedPercent() / avgPathUsedPercent;
+        }
+    }
+
     public static final BeStatComparator HDD_COMPARATOR = new 
BeStatComparator(TStorageMedium.HDD);
     public static final BeStatComparator SSD_COMPARATOR = new 
BeStatComparator(TStorageMedium.SSD);
     public static final BeStatMixComparator MIX_COMPARATOR = new 
BeStatMixComparator();
@@ -362,9 +426,9 @@ public class BackendLoadStatistic {
             }
 
             result.add(pathStatistic);
-            return BalanceStatus.OK;
         }
-        return status;
+
+        return result.isEmpty() ? status : BalanceStatus.OK;
     }
 
     /**
@@ -456,6 +520,50 @@ public class BackendLoadStatistic {
                 beId, low.size(), mid.size(), high.size());
     }
 
+    public void getPathStatisticByClass(List<RootPathLoadStatistic> low,
+            List<RootPathLoadStatistic> mid, List<RootPathLoadStatistic> high, 
TStorageMedium storageMedium) {
+        for (RootPathLoadStatistic pathStat : pathStatistics) {
+            if (pathStat.getDiskState() == DiskState.OFFLINE
+                    || (storageMedium != null && pathStat.getStorageMedium() 
!= storageMedium)) {
+                continue;
+            }
+
+            if (pathStat.getClazz() == Classification.LOW) {
+                low.add(pathStat);
+            } else if (pathStat.getClazz() == Classification.HIGH) {
+                high.add(pathStat);
+            } else {
+                mid.add(pathStat);
+            }
+        }
+
+        LOG.debug("after adjust, backend {} path classification low/mid/high: 
{}/{}/{}",
+                beId, low.size(), mid.size(), high.size());
+    }
+
+    public void incrPathsCopingSize(Map<Long, Long> pathsCopingSize) {
+        boolean updated = false;
+        for (RootPathLoadStatistic pathStat : pathStatistics) {
+            Long copingSize = pathsCopingSize.get(pathStat.getPathHash());
+            if (copingSize != null && copingSize > 0) {
+                pathStat.incrCopingSizeB(copingSize);
+                updated = true;
+            }
+        }
+        if (updated) {
+            Collections.sort(pathStatistics);
+        }
+    }
+
+    public void incrPathCopingSize(long pathHash, long copingSize) {
+        RootPathLoadStatistic pathStat = pathStatistics.stream().filter(
+                p -> p.getPathHash() == pathHash).findFirst().orElse(null);
+        if (pathStat != null) {
+            pathStat.incrCopingSizeB(copingSize);
+            Collections.sort(pathStatistics);
+        }
+    }
+
     public List<RootPathLoadStatistic> getPathStatistics() {
         return pathStatistics;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 5317725881..d388e5fd79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -22,6 +22,8 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
+import 
org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator;
 import org.apache.doris.clone.SchedException.Status;
 import org.apache.doris.clone.SchedException.SubCode;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
@@ -51,8 +53,9 @@ import java.util.Set;
 public class BeLoadRebalancer extends Rebalancer {
     private static final Logger LOG = 
LogManager.getLogger(BeLoadRebalancer.class);
 
-    public BeLoadRebalancer(SystemInfoService infoService, TabletInvertedIndex 
invertedIndex) {
-        super(infoService, invertedIndex);
+    public BeLoadRebalancer(SystemInfoService infoService, TabletInvertedIndex 
invertedIndex,
+            Map<Long, PathSlot> backendsWorkingSlots) {
+        super(infoService, invertedIndex, backendsWorkingSlots);
     }
 
     /*
@@ -100,9 +103,16 @@ public class BeLoadRebalancer extends Rebalancer {
             return alternativeTablets;
         }
 
-        // get the number of low load paths. and we should at most select this 
number of tablets
-        long numOfLowPaths = lowBEs.stream().filter(b -> b.isAvailable() && 
b.hasAvailDisk()).mapToLong(
-                b -> b.getAvailPathNum(medium)).sum();
+        long numOfLowPaths = 0;
+        for (BackendLoadStatistic backendLoadStatistic : lowBEs) {
+            if (!backendLoadStatistic.isAvailable()) {
+                continue;
+            }
+            PathSlot pathSlot = 
backendsWorkingSlots.get(backendLoadStatistic.getBeId());
+            if (pathSlot != null) {
+                numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum();
+            }
+        }
         LOG.info("get number of low load paths: {}, with medium: {}", 
numOfLowPaths, medium);
 
         int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
@@ -113,6 +123,10 @@ public class BeLoadRebalancer extends Rebalancer {
         OUTER:
         for (int i = highBEs.size() - 1; i >= 0; i--) {
             BackendLoadStatistic beStat = highBEs.get(i);
+            PathSlot pathSlot = backendsWorkingSlots.get(beStat.getBeId());
+            if (pathSlot == null) {
+                continue;
+            }
 
             // classify the paths.
             Set<Long> pathLow = Sets.newHashSet();
@@ -129,7 +143,10 @@ public class BeLoadRebalancer extends Rebalancer {
             // for each path, we try to select at most 
BALANCE_SLOT_NUM_FOR_PATH tablets
             Map<Long, Integer> remainingPaths = Maps.newHashMap();
             for (Long pathHash : pathHigh) {
-                remainingPaths.put(pathHash, Config.balance_slot_num_per_path);
+                int availBalanceNum = 
pathSlot.getAvailableBalanceNum(pathHash);
+                if (availBalanceNum > 0) {
+                    remainingPaths.put(pathHash, availBalanceNum);
+                }
             }
 
             if (remainingPaths.isEmpty()) {
@@ -201,8 +218,7 @@ public class BeLoadRebalancer extends Rebalancer {
      * 2. Select a low load backend as destination. And tablet should not has 
replica on this backend.
      */
     @Override
-    public void completeSchedCtx(TabletSchedCtx tabletCtx,
-            Map<Long, PathSlot> backendsWorkingSlots) throws SchedException {
+    public void completeSchedCtx(TabletSchedCtx tabletCtx) throws 
SchedException {
         LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag());
         if (clusterStat == null) {
             throw new SchedException(Status.UNRECOVERABLE,
@@ -305,6 +321,7 @@ public class BeLoadRebalancer extends Rebalancer {
             throw new SchedException(Status.UNRECOVERABLE, "unable to find low 
backend");
         }
 
+        List<BePathLoadStatPair> candFitPaths = Lists.newArrayList();
         for (BackendLoadStatistic beStat : candidates) {
             PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
             if (slot == null) {
@@ -313,15 +330,26 @@ public class BeLoadRebalancer extends Rebalancer {
 
             // classify the paths.
             // And we only select path from 'low' and 'mid' paths
-            Set<Long> pathLow = Sets.newHashSet();
-            Set<Long> pathMid = Sets.newHashSet();
-            Set<Long> pathHigh = Sets.newHashSet();
+            List<RootPathLoadStatistic> pathLow = Lists.newArrayList();
+            List<RootPathLoadStatistic> pathMid = Lists.newArrayList();
+            List<RootPathLoadStatistic> pathHigh = Lists.newArrayList();
             beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, 
tabletCtx.getStorageMedium());
+
             pathLow.addAll(pathMid);
+            pathLow.stream().forEach(path -> candFitPaths.add(new 
BePathLoadStatPair(beStat, path)));
+        }
 
-            long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow);
-            if (pathHash != -1) {
-                tabletCtx.setDest(beStat.getBeId(), pathHash);
+        BePathLoadStatPairComparator comparator = new 
BePathLoadStatPairComparator(candFitPaths);
+        Collections.sort(candFitPaths, comparator);
+        for (BePathLoadStatPair bePathLoadStat : candFitPaths) {
+            BackendLoadStatistic beStat = 
bePathLoadStat.getBackendLoadStatistic();
+            RootPathLoadStatistic pathStat = 
bePathLoadStat.getPathLoadStatistic();
+            PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
+            if (slot == null) {
+                continue;
+            }
+            if (slot.takeBalanceSlot(pathStat.getPathHash()) != -1) {
+                tabletCtx.setDest(beStat.getBeId(), pathStat.getPathHash());
                 return;
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index abac0c2d1a..fe835b94af 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -24,7 +24,6 @@ import org.apache.doris.clone.SchedException.Status;
 import org.apache.doris.clone.TabletSchedCtx.BalanceType;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.clone.TabletScheduler.PathSlot;
-import org.apache.doris.common.Config;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TStorageMedium;
 
@@ -52,8 +51,9 @@ import java.util.Set;
 public class DiskRebalancer extends Rebalancer {
     private static final Logger LOG = 
LogManager.getLogger(DiskRebalancer.class);
 
-    public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex 
invertedIndex) {
-        super(infoService, invertedIndex);
+    public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex 
invertedIndex,
+            Map<Long, PathSlot> backendsWorkingSlots) {
+        super(infoService, invertedIndex, backendsWorkingSlots);
     }
 
     public List<BackendLoadStatistic> 
filterByPrioBackends(List<BackendLoadStatistic> bes) {
@@ -152,6 +152,10 @@ public class DiskRebalancer extends Rebalancer {
         Collections.shuffle(midBEs);
         for (int i = midBEs.size() - 1; i >= 0; i--) {
             BackendLoadStatistic beStat = midBEs.get(i);
+            PathSlot pathSlot = backendsWorkingSlots.get(beStat.getBeId());
+            if (pathSlot == null) {
+                continue;
+            }
 
             // classify the paths.
             Set<Long> pathLow = Sets.newHashSet();
@@ -171,7 +175,10 @@ public class DiskRebalancer extends Rebalancer {
             // for each path, we try to select at most 
BALANCE_SLOT_NUM_FOR_PATH tablets
             Map<Long, Integer> remainingPaths = Maps.newHashMap();
             for (Long pathHash : pathHigh) {
-                remainingPaths.put(pathHash, Config.balance_slot_num_per_path);
+                int availBalanceNum = 
pathSlot.getAvailableBalanceNum(pathHash);
+                if (availBalanceNum > 0) {
+                    remainingPaths.put(pathHash, availBalanceNum);
+                }
             }
 
             if (remainingPaths.isEmpty()) {
@@ -246,8 +253,7 @@ public class DiskRebalancer extends Rebalancer {
      * 3. Select a low load path from this backend as destination.
      */
     @Override
-    public void completeSchedCtx(TabletSchedCtx tabletCtx,
-            Map<Long, PathSlot> backendsWorkingSlots) throws SchedException {
+    public void completeSchedCtx(TabletSchedCtx tabletCtx) throws 
SchedException {
         LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag());
         if (clusterStat == null) {
             throw new SchedException(Status.UNRECOVERABLE,
@@ -323,7 +329,7 @@ public class DiskRebalancer extends Rebalancer {
             }
             long destPathHash = slot.takeBalanceSlot(stat.getPathHash());
             if (destPathHash == -1) {
-                throw new SchedException(Status.UNRECOVERABLE, "unable to take 
dest slot");
+                continue;
             }
             tabletCtx.setDest(beStat.getBeId(), destPathHash, stat.getPath());
             setDest = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
index 224399f480..413a3b129f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
@@ -342,6 +342,10 @@ public class LoadStatisticForTag {
         return null;
     }
 
+    public List<BackendLoadStatistic> getBackendLoadStatistics() {
+        return beLoadStatistics;
+    }
+
     /*
      * If cluster is balance, all Backends will be in 'mid', and 'high' and 
'low' is empty
      * If both 'high' and 'low' has Backends, just return
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index d9d3f27cc7..6c83944462 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.doris.clone;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.clone.TabletScheduler.PathSlot;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
 import org.apache.doris.resource.Tag;
@@ -63,8 +64,9 @@ public class PartitionRebalancer extends Rebalancer {
     private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
     private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
 
-    public PartitionRebalancer(SystemInfoService infoService, 
TabletInvertedIndex invertedIndex) {
-        super(infoService, invertedIndex);
+    public PartitionRebalancer(SystemInfoService infoService, 
TabletInvertedIndex invertedIndex,
+            Map<Long, PathSlot> backendsWorkingSlots) {
+        super(infoService, invertedIndex, backendsWorkingSlots);
     }
 
     @Override
@@ -229,7 +231,7 @@ public class PartitionRebalancer extends Rebalancer {
     }
 
     @Override
-    protected void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, 
TabletScheduler.PathSlot> backendsWorkingSlots)
+    protected void completeSchedCtx(TabletSchedCtx tabletCtx)
             throws SchedException {
         MovesCacheMap.MovesCache movesInProgress = 
movesCacheMap.getCache(tabletCtx.getTag(),
                 tabletCtx.getStorageMedium());
@@ -271,10 +273,10 @@ public class PartitionRebalancer extends Rebalancer {
             if (pathHash == -1) {
                 throw new 
SchedException(SchedException.Status.SCHEDULE_FAILED, 
SchedException.SubCode.WAITING_SLOT,
                         "paths has no available balance slot: " + availPath);
-            } else {
-                tabletCtx.setDest(beStat.getBeId(), pathHash);
             }
 
+            tabletCtx.setDest(beStat.getBeId(), pathHash);
+
             // ToDeleteReplica is the source replica
             pair.second = srcReplica.getId();
         } catch (IllegalStateException | NullPointerException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
index 4c3ef57546..bee23747bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
@@ -49,14 +49,17 @@ public abstract class Rebalancer {
     // When Rebalancer init, the statisticMap is usually empty. So it's no 
need to be an arg.
     // Only use updateLoadStatistic() to load stats.
     protected Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap();
+    protected Map<Long, PathSlot> backendsWorkingSlots;
     protected TabletInvertedIndex invertedIndex;
     protected SystemInfoService infoService;
     // be id -> end time of prio
     protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
 
-    public Rebalancer(SystemInfoService infoService, TabletInvertedIndex 
invertedIndex) {
+    public Rebalancer(SystemInfoService infoService, TabletInvertedIndex 
invertedIndex,
+            Map<Long, PathSlot> backendsWorkingSlots) {
         this.infoService = infoService;
         this.invertedIndex = invertedIndex;
+        this.backendsWorkingSlots = backendsWorkingSlots;
     }
 
     public List<TabletSchedCtx> selectAlternativeTablets() {
@@ -74,9 +77,9 @@ public abstract class Rebalancer {
     protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster(
             LoadStatisticForTag clusterStat, TStorageMedium medium);
 
-    public AgentTask createBalanceTask(TabletSchedCtx tabletCtx, Map<Long, 
PathSlot> backendsWorkingSlots)
+    public AgentTask createBalanceTask(TabletSchedCtx tabletCtx)
             throws SchedException {
-        completeSchedCtx(tabletCtx, backendsWorkingSlots);
+        completeSchedCtx(tabletCtx);
         if (tabletCtx.getBalanceType() == 
TabletSchedCtx.BalanceType.BE_BALANCE) {
             return tabletCtx.createCloneReplicaAndTask();
         } else {
@@ -90,7 +93,7 @@ public abstract class Rebalancer {
     // You should check the moves' validation.
     // 2. If you want to generate {srcReplica, destBe} here, just do it.
     // 3. You should check the path slots of src & dest.
-    protected abstract void completeSchedCtx(TabletSchedCtx tabletCtx, 
Map<Long, PathSlot> backendsWorkingSlots)
+    protected abstract void completeSchedCtx(TabletSchedCtx tabletCtx)
             throws SchedException;
 
     public Long getToDeleteReplicaId(TabletSchedCtx tabletCtx) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
index 3aeb4069ab..1a51276f7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
@@ -31,6 +31,7 @@ public class RootPathLoadStatistic implements 
Comparable<RootPathLoadStatistic>
     private TStorageMedium storageMedium;
     private long capacityB;
     private long usedCapacityB;
+    private long copingSizeB;
     private DiskState diskState;
 
     private Classification clazz = Classification.INIT;
@@ -43,6 +44,7 @@ public class RootPathLoadStatistic implements 
Comparable<RootPathLoadStatistic>
         this.storageMedium = storageMedium;
         this.capacityB = capacityB <= 0 ? 1 : capacityB;
         this.usedCapacityB = usedCapacityB;
+        this.copingSizeB = 0;
         this.diskState = diskState;
     }
 
@@ -71,7 +73,11 @@ public class RootPathLoadStatistic implements 
Comparable<RootPathLoadStatistic>
     }
 
     public double getUsedPercent() {
-        return capacityB <= 0 ? 0.0 : usedCapacityB / (double) capacityB;
+        return capacityB <= 0 ? 0.0 : (usedCapacityB + copingSizeB) / (double) 
capacityB;
+    }
+
+    public void incrCopingSizeB(long size) {
+        copingSizeB += size;
     }
 
     public void setClazz(Classification clazz) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 3c2d2b148c..4a3a1a6eb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -34,6 +34,7 @@ import org.apache.doris.clone.TabletScheduler.PathSlot;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.ReplicaPersistInfo;
 import org.apache.doris.resource.Tag;
@@ -179,6 +180,8 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
     private long visibleVersion = -1;
     private long committedVersion = -1;
 
+    private long tabletSize = 0;
+
     private Replica srcReplica = null;
     private long srcPathHash = -1;
     // for disk balance to keep src path, and avoid take slot on 
selectAlternativeTablets
@@ -281,6 +284,10 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         return failedSchedCounter;
     }
 
+    public void resetFailedSchedCounter() {
+        failedSchedCounter = 0;
+    }
+
     public void increaseFailedRunningCounter() {
         ++failedRunningCounter;
     }
@@ -301,7 +308,7 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         } else {
             decommissionTime = -1;
             if (code == SubCode.WAITING_SLOT && type != Type.BALANCE) {
-                return failedSchedCounter > 30 * 1000 / 
TabletScheduler.SCHEDULE_INTERVAL_MS;
+                return failedSchedCounter > 30 * 1000 / 
FeConstants.tablet_schedule_interval_ms;
             } else {
                 return failedSchedCounter > 10;
             }
@@ -477,13 +484,13 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
 
     // database lock should be held.
     public long getTabletSize() {
-        long max = Long.MIN_VALUE;
-        for (Replica replica : tablet.getReplicas()) {
-            if (replica.getDataSize() > max) {
-                max = replica.getDataSize();
-            }
-        }
-        return max;
+        return tabletSize;
+    }
+
+    public void updateTabletSize() {
+        tabletSize = 0;
+        tablet.getReplicas().stream().forEach(
+                replica -> tabletSize = Math.max(tabletSize, 
replica.getDataSize()));
     }
 
     /*
@@ -905,6 +912,7 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         // if this is a balance task, or this is a repair task with
         // REPLICA_MISSING/REPLICA_RELOCATING,
         // we create a new replica with state CLONE
+        long replicaId = 0;
         if (tabletStatus == TabletStatus.REPLICA_MISSING
                 || tabletStatus == TabletStatus.REPLICA_RELOCATING || type == 
Type.BALANCE
                 || tabletStatus == TabletStatus.COLOCATE_MISMATCH
@@ -917,14 +925,9 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                     committedVersion, /* use committed version as last failed 
version */
                     -1 /* last success version */);
 
-            TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), 
srcBe.getHttpPort());
-            cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, 
indexId,
-                tabletId, cloneReplica.getId(), schemaHash, 
Lists.newArrayList(tSrcBe), storageMedium,
-                visibleVersion, (int) (taskTimeoutMs / 1000));
-            cloneTask.setPathHash(srcPathHash, destPathHash);
-
             // addReplica() method will add this replica to tablet inverted 
index too.
             tablet.addReplica(cloneReplica);
+            replicaId = cloneReplica.getId();
         } else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) {
             Preconditions.checkState(type == Type.REPAIR, type);
             // double check
@@ -937,18 +940,31 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                 throw new SchedException(Status.SCHEDULE_FAILED, "dest 
replica's path hash is changed. "
                         + "current: " + replica.getPathHash() + ", scheduled: 
" + destPathHash);
             }
+            replicaId = replica.getId();
+        }
+
+        TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), 
srcBe.getHttpPort());
+        TBackend tDestBe = new TBackend(destBe.getHost(), destBe.getBePort(), 
destBe.getHttpPort());
 
-            TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), 
srcBe.getHttpPort());
-            cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, 
indexId,
-                tabletId, replica.getId(), schemaHash, 
Lists.newArrayList(tSrcBe), storageMedium,
+        cloneTask = new CloneTask(tDestBe, destBackendId, dbId, tblId, 
partitionId, indexId, tabletId,
+                replicaId, schemaHash, Lists.newArrayList(tSrcBe), 
storageMedium,
                 visibleVersion, (int) (taskTimeoutMs / 1000));
-            cloneTask.setPathHash(srcPathHash, destPathHash);
-        }
+        cloneTask.setPathHash(srcPathHash, destPathHash);
 
         this.state = State.RUNNING;
         return cloneTask;
     }
 
+    // for storage migration or cloning a new replica
+    public long getDestEstimatedCopingSize() {
+        if ((cloneTask != null && tabletStatus != 
TabletStatus.VERSION_INCOMPLETE)
+                || storageMediaMigrationTask != null) {
+            return Math.max(getTabletSize(), 10L);
+        } else {
+            return 0;
+        }
+    }
+
     // timeout is between MIN_CLONE_TASK_TIMEOUT_MS and 
MAX_CLONE_TASK_TIMEOUT_MS
     private long getApproximateTimeoutMs() {
         long tabletSize = getTabletSize();
@@ -1131,6 +1147,8 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         result.add(TimeUtils.longToTimeString(lastSchedTime));
         result.add(TimeUtils.longToTimeString(lastVisitedTime));
         result.add(TimeUtils.longToTimeString(finishedTime));
+        Pair<Double, String> tabletSizeDesc = 
DebugUtil.getByteUint(tabletSize);
+        
result.add(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(tabletSizeDesc.first) + " " 
+ tabletSizeDesc.second);
         result.add(copyTimeMs > 0 ? String.valueOf(copySize / copyTimeMs / 
1000.0) : FeConstants.null_string);
         result.add(String.valueOf(failedSchedCounter));
         result.add(String.valueOf(failedRunningCounter));
@@ -1162,8 +1180,9 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
             value += 5 * 1000L;
         }
 
+        // repair tasks always prior than balance
         if (type == Type.BALANCE) {
-            value += 30 * 60 * 1000L;
+            value += 10 * 24 * 3600L;
         }
 
         return value;
@@ -1174,12 +1193,19 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         StringBuilder sb = new StringBuilder();
         sb.append("tablet id: ").append(tabletId).append(", status: 
").append(tabletStatus.name());
         sb.append(", state: ").append(state.name()).append(", type: 
").append(type.name());
+        if (type == Type.BALANCE && balanceType != null) {
+            sb.append(", balance: ").append(balanceType.name());
+        }
+        if (priority != null) {
+            sb.append(", priority: ").append(priority.name());
+        }
+        sb.append(", tablet size: ").append(tabletSize);
         if (srcReplica != null) {
-            sb.append(". from backend: ").append(srcReplica.getBackendId());
+            sb.append(", from backend: ").append(srcReplica.getBackendId());
             sb.append(", src path hash: ").append(srcPathHash);
         }
         if (destPathHash != -1) {
-            sb.append(". to backend: ").append(destBackendId);
+            sb.append(", to backend: ").append(destBackendId);
             sb.append(", dest path hash: ").append(destPathHash);
         }
         sb.append(", visible version: ").append(visibleVersion);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 1bc2f54337..d791b53475 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -22,7 +22,6 @@ import org.apache.doris.analysis.AdminRebalanceDiskStmt;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.DiskInfo.DiskState;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -36,6 +35,8 @@ import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.Tablet.TabletStatus;
 import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
+import 
org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator;
 import org.apache.doris.clone.SchedException.Status;
 import org.apache.doris.clone.SchedException.SubCode;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
@@ -58,6 +59,7 @@ import org.apache.doris.task.DropReplicaTask;
 import org.apache.doris.task.StorageMediaMigrationTask;
 import org.apache.doris.thrift.TFinishTaskRequest;
 import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.transaction.DatabaseTransactionMgr;
 import org.apache.doris.transaction.TransactionState;
 
@@ -72,12 +74,12 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * TabletScheduler saved the tablets produced by TabletChecker and try to 
schedule them.
@@ -103,8 +105,6 @@ public class TabletScheduler extends MasterDaemon {
     // the minimum interval of updating cluster statistics and priority of 
tablet info
     private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s
 
-    public static final long SCHEDULE_INTERVAL_MS = 100;
-
     /*
      * Tablet is added to pendingTablets as well it's id in allTabletTypes.
      * TabletScheduler will take tablet from pendingTablets but will not 
remove it's id from allTabletTypes when
@@ -127,12 +127,11 @@ public class TabletScheduler extends MasterDaemon {
     private Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap();
     // Tag -> load statistic
     private Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap();
+
     private long lastStatUpdateTime = 0;
 
     private long lastSlotAdjustTime = 0;
 
-    private long lastCheckTimeoutTime = 0;
-
     private Env env;
     private SystemInfoService infoService;
     private TabletInvertedIndex invertedIndex;
@@ -151,19 +150,19 @@ public class TabletScheduler extends MasterDaemon {
 
     public TabletScheduler(Env env, SystemInfoService infoService, 
TabletInvertedIndex invertedIndex,
                            TabletSchedulerStat stat, String rebalancerType) {
-        super("tablet scheduler", SCHEDULE_INTERVAL_MS);
+        super("tablet scheduler", FeConstants.tablet_schedule_interval_ms);
         this.env = env;
         this.infoService = infoService;
         this.invertedIndex = invertedIndex;
         this.colocateTableIndex = env.getColocateTableIndex();
         this.stat = stat;
         if (rebalancerType.equalsIgnoreCase("partition")) {
-            this.rebalancer = new PartitionRebalancer(infoService, 
invertedIndex);
+            this.rebalancer = new PartitionRebalancer(infoService, 
invertedIndex, backendsWorkingSlots);
         } else {
-            this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex);
+            this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex, 
backendsWorkingSlots);
         }
         // if rebalancer can not get new task, then use diskRebalancer to get 
task
-        this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex);
+        this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex, 
backendsWorkingSlots);
     }
 
     public TabletSchedulerStat getStat() {
@@ -190,10 +189,11 @@ public class TabletScheduler extends MasterDaemon {
         Set<Long> deletedBeIds = Sets.newHashSet();
         for (Long beId : backendsWorkingSlots.keySet()) {
             if (backends.containsKey(beId)) {
-                List<Long> pathHashes = 
backends.get(beId).getDisks().values().stream()
+                Map<Long, TStorageMedium> paths = Maps.newHashMap();
+                backends.get(beId).getDisks().values().stream()
                         .filter(v -> v.getState() == DiskState.ONLINE)
-                        
.map(DiskInfo::getPathHash).collect(Collectors.toList());
-                backendsWorkingSlots.get(beId).updatePaths(pathHashes);
+                        .forEach(v -> paths.put(v.getPathHash(), 
v.getStorageMedium()));
+                backendsWorkingSlots.get(beId).updatePaths(paths);
             } else {
                 deletedBeIds.add(beId);
             }
@@ -208,9 +208,11 @@ public class TabletScheduler extends MasterDaemon {
         // add new backends
         for (Backend be : backends.values()) {
             if (!backendsWorkingSlots.containsKey(be.getId())) {
-                List<Long> pathHashes = be.getDisks().values().stream()
-                        
.map(DiskInfo::getPathHash).collect(Collectors.toList());
-                PathSlot slot = new PathSlot(pathHashes, be.getId());
+                Map<Long, TStorageMedium> paths = Maps.newHashMap();
+                be.getDisks().values().stream()
+                        .filter(v -> v.getState() == DiskState.ONLINE)
+                        .forEach(v -> paths.put(v.getPathHash(), 
v.getStorageMedium()));
+                PathSlot slot = new PathSlot(paths, be.getId());
                 backendsWorkingSlots.put(be.getId(), slot);
                 LOG.info("add new backend {} with slots num: {}", be.getId(), 
be.getDisks().size());
             }
@@ -261,9 +263,7 @@ public class TabletScheduler extends MasterDaemon {
 
         pendingTablets.offer(tablet);
         if (!contains) {
-            LOG.info("Add tablet to pending queue, tablet id {}, type {}, 
status {}, priority {}",
-                    tablet.getTabletId(), tablet.getType(), 
tablet.getTabletStatus(),
-                    tablet.getPriority());
+            LOG.info("Add tablet to pending queue, {}", tablet);
         }
 
         return AddResult.ADDED;
@@ -319,24 +319,16 @@ public class TabletScheduler extends MasterDaemon {
             return;
         }
 
-        if (System.currentTimeMillis() - lastCheckTimeoutTime >= 1000L) {
-            updateLoadStatisticsAndPriorityIfNecessary();
-            handleRunningTablets();
-            selectTabletsForBalance();
-            lastCheckTimeoutTime = System.currentTimeMillis();
-        }
-
+        updateLoadStatistics();
+        handleRunningTablets();
+        selectTabletsForBalance();
         schedulePendingTablets();
 
         stat.counterTabletScheduleRound.incrementAndGet();
     }
 
 
-    private void updateLoadStatisticsAndPriorityIfNecessary() {
-        if (System.currentTimeMillis() - lastStatUpdateTime < 
STAT_UPDATE_INTERVAL_MS) {
-            return;
-        }
-
+    private void updateLoadStatistics() {
         updateLoadStatistic();
         rebalancer.updateLoadStatistic(statisticMap);
         diskRebalancer.updateLoadStatistic(statisticMap);
@@ -359,6 +351,12 @@ public class TabletScheduler extends MasterDaemon {
             newStatisticMap.put(tag, loadStatistic);
             LOG.debug("update load statistic for tag {}:\n{}", tag, 
loadStatistic.getBrief());
         }
+        Map<Long, Long> pathsCopingSize = getPathsCopingSize();
+        for (LoadStatisticForTag loadStatistic : newStatisticMap.values()) {
+            for (BackendLoadStatistic beLoadStatistic : 
loadStatistic.getBackendLoadStatistics()) {
+                beLoadStatistic.incrPathsCopingSize(pathsCopingSize);
+            }
+        }
 
         this.statisticMap = newStatisticMap;
     }
@@ -584,6 +582,7 @@ public class TabletScheduler extends MasterDaemon {
             // we do not concern priority here.
             // once we take the tablet out of priority queue, priority is 
meaningless.
             tabletCtx.setTablet(tablet);
+            tabletCtx.updateTabletSize();
             tabletCtx.setVersionInfo(partition.getVisibleVersion(), 
partition.getCommittedVersion());
             tabletCtx.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId()));
             
tabletCtx.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium());
@@ -691,6 +690,7 @@ public class TabletScheduler extends MasterDaemon {
 
         // create clone task
         batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
+        incrDestPathCopingSize(tabletCtx);
     }
 
     // In dealing with the case of missing replicas, we need to select a tag 
with missing replicas
@@ -782,6 +782,7 @@ public class TabletScheduler extends MasterDaemon {
     private void handleReplicaRelocating(TabletSchedCtx tabletCtx, 
AgentBatchTask batchTask)
             throws SchedException {
         stat.counterReplicaUnavailableErr.incrementAndGet();
+        tabletCtx.setTabletStatus(TabletStatus.VERSION_INCOMPLETE);
         handleReplicaVersionIncomplete(tabletCtx, batchTask);
     }
 
@@ -1202,6 +1203,7 @@ public class TabletScheduler extends MasterDaemon {
 
         // create clone task
         batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
+        incrDestPathCopingSize(tabletCtx);
     }
 
     /**
@@ -1214,16 +1216,23 @@ public class TabletScheduler extends MasterDaemon {
             return;
         }
 
-        long numOfBalancingTablets = getBalanceTabletsNumber();
-        if (numOfBalancingTablets > Config.max_balancing_tablets) {
-            LOG.info("number of balancing tablets {} exceed limit: {}, skip 
selecting tablets for balance",
-                    numOfBalancingTablets, Config.max_balancing_tablets);
+        // No need to prefetch too many balance task to pending queue.
+        // Because for every sched, it will re select the balance task.
+        int needAddBalanceNum = Math.min(Config.schedule_batch_size - 
getPendingNum(),
+                    Config.max_balancing_tablets - getBalanceTabletsNumber());
+        if (needAddBalanceNum <= 0) {
             return;
         }
 
         List<TabletSchedCtx> alternativeTablets = 
rebalancer.selectAlternativeTablets();
+        Collections.shuffle(alternativeTablets);
         for (TabletSchedCtx tabletCtx : alternativeTablets) {
-            addTablet(tabletCtx, false);
+            if (addTablet(tabletCtx, false) == AddResult.ADDED) {
+                needAddBalanceNum--;
+                if (needAddBalanceNum <= 0) {
+                    return;
+                }
+            }
         }
         if (Config.disable_disk_balance) {
             LOG.info("disk balance is disabled. skip selecting tablets for 
disk balance");
@@ -1237,7 +1246,12 @@ public class TabletScheduler extends MasterDaemon {
         for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
             // add if task from prio backend or cluster is balanced
             if (alternativeTablets.isEmpty() || tabletCtx.getPriority() == 
TabletSchedCtx.Priority.NORMAL) {
-                addTablet(tabletCtx, false);
+                if (addTablet(tabletCtx, false) == AddResult.ADDED) {
+                    needAddBalanceNum--;
+                    if (needAddBalanceNum <= 0) {
+                        break;
+                    }
+                }
             }
         }
     }
@@ -1249,16 +1263,17 @@ public class TabletScheduler extends MasterDaemon {
         stat.counterBalanceSchedule.incrementAndGet();
         AgentTask task = null;
         if (tabletCtx.getBalanceType() == 
TabletSchedCtx.BalanceType.DISK_BALANCE) {
-            task = diskRebalancer.createBalanceTask(tabletCtx, 
backendsWorkingSlots);
+            task = diskRebalancer.createBalanceTask(tabletCtx);
             checkDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), 
tabletCtx.getSrcPathHash());
             checkDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), 
tabletCtx.getDestPathHash());
         } else if (tabletCtx.getBalanceType() == 
TabletSchedCtx.BalanceType.BE_BALANCE) {
-            task = rebalancer.createBalanceTask(tabletCtx, 
backendsWorkingSlots);
+            task = rebalancer.createBalanceTask(tabletCtx);
         } else {
             throw new SchedException(Status.UNRECOVERABLE,
                 "unknown balance type: " + 
tabletCtx.getBalanceType().toString());
         }
         batchTask.addTask(task);
+        incrDestPathCopingSize(tabletCtx);
     }
 
     // choose a path on a backend which is fit for the tablet
@@ -1294,7 +1309,7 @@ public class TabletScheduler extends MasterDaemon {
 
         // get all available paths which this tablet can fit in.
         // beStatistics is sorted by mix load score in ascend order, so select 
from first to last.
-        List<RootPathLoadStatistic> allFitPaths = Lists.newArrayList();
+        List<BePathLoadStatPair> allFitPaths = Lists.newArrayList();
         for (BackendLoadStatistic bes : beStatistics) {
             if (!bes.isAvailable()) {
                 LOG.debug("backend {} is not available, skip. tablet: {}", 
bes.getBeId(), tabletCtx.getTabletId());
@@ -1343,18 +1358,21 @@ public class TabletScheduler extends MasterDaemon {
                 }
             }
 
-            Preconditions.checkState(resultPaths.size() == 1);
-            allFitPaths.add(resultPaths.get(0));
+            resultPaths.stream().forEach(path -> allFitPaths.add(new 
BePathLoadStatPair(bes, path)));
         }
 
         if (allFitPaths.isEmpty()) {
             throw new SchedException(Status.UNRECOVERABLE, "unable to find 
dest path for new replica");
         }
 
+        BePathLoadStatPairComparator comparator = new 
BePathLoadStatPairComparator(allFitPaths);
+        Collections.sort(allFitPaths, comparator);
+
         // all fit paths has already been sorted by load score in 
'allFitPaths' in ascend order.
         // just get first available path.
         // we try to find a path with specified media type, if not find, 
arbitrarily use one.
-        for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) {
+        for (BePathLoadStatPair bePathLoadStat : allFitPaths) {
+            RootPathLoadStatistic rootPathLoadStatistic = 
bePathLoadStat.getPathLoadStatistic();
             if (rootPathLoadStatistic.getStorageMedium() != 
tabletCtx.getStorageMedium()) {
                 LOG.debug("backend {}'s path {}'s storage medium {} "
                                 + "is not equal to tablet's storage medium {}, 
skip. tablet: {}",
@@ -1385,7 +1403,8 @@ public class TabletScheduler extends MasterDaemon {
         boolean hasBePath = false;
 
         // no root path with specified media type is found, get arbitrary one.
-        for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) {
+        for (BePathLoadStatPair bePathLoadStat : allFitPaths) {
+            RootPathLoadStatistic rootPathLoadStatistic = 
bePathLoadStat.getPathLoadStatistic();
             PathSlot slot = 
backendsWorkingSlots.get(rootPathLoadStatistic.getBeId());
             if (slot == null) {
                 LOG.debug("backend {}'s path {}'s slot is null, skip. tablet: 
{}",
@@ -1622,7 +1641,10 @@ public class TabletScheduler extends MasterDaemon {
                 tabletCtx.increaseFailedRunningCounter();
                 if (!tabletCtx.isExceedFailedRunningLimit()) {
                     stat.counterCloneTaskFailed.incrementAndGet();
-                    addToRunningTablets(tabletCtx);
+                    tabletCtx.releaseResource(this);
+                    tabletCtx.resetFailedSchedCounter();
+                    tabletCtx.setState(TabletSchedCtx.State.PENDING);
+                    addBackToPendingTablets(tabletCtx);
                     return false;
                 } else {
                     // unrecoverable
@@ -1767,9 +1789,42 @@ public class TabletScheduler extends MasterDaemon {
         return allTabletTypes.size();
     }
 
-    public synchronized long getBalanceTabletsNumber() {
-        return pendingTablets.stream().filter(t -> t.getType() == 
Type.BALANCE).count()
-                + runningTablets.values().stream().filter(t -> t.getType() == 
Type.BALANCE).count();
+    public synchronized int getBalanceTabletsNumber() {
+        return (int) (pendingTablets.stream().filter(t -> t.getType() == 
Type.BALANCE).count()
+                + runningTablets.values().stream().filter(t -> t.getType() == 
Type.BALANCE).count());
+    }
+
+    private synchronized Map<Long, Long> getPathsCopingSize() {
+        Map<Long, Long> pathsCopingSize = Maps.newHashMap();
+        for (TabletSchedCtx tablet : runningTablets.values()) {
+            long pathHash = tablet.getDestPathHash();
+            if (pathHash == 0 || pathHash == -1) {
+                continue;
+            }
+
+            long copingSize = tablet.getDestEstimatedCopingSize();
+            if (copingSize > 0) {
+                Long size = pathsCopingSize.getOrDefault(pathHash, 0L);
+                pathsCopingSize.put(pathHash, size + copingSize);
+            }
+        }
+        return pathsCopingSize;
+    }
+
+    private void incrDestPathCopingSize(TabletSchedCtx tablet) {
+        long destPathHash = tablet.getDestPathHash();
+        if (destPathHash == -1 || destPathHash == 0) {
+            return;
+        }
+
+        for (LoadStatisticForTag loadStatistic : statisticMap.values()) {
+            BackendLoadStatistic beLoadStatistic = 
loadStatistic.getBackendLoadStatistics().stream()
+                    .filter(v -> v.getBeId() == 
tablet.getDestBackendId()).findFirst().orElse(null);
+            if (beLoadStatistic != null) {
+                beLoadStatistic.incrPathCopingSize(destPathHash, 
tablet.getDestEstimatedCopingSize());
+                break;
+            }
+        }
     }
 
     /**
@@ -1782,22 +1837,22 @@ public class TabletScheduler extends MasterDaemon {
         private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
         private long beId;
 
-        public PathSlot(List<Long> paths, long beId) {
+        public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
             this.beId = beId;
-            for (Long pathHash : paths) {
-                pathSlots.put(pathHash, new Slot(beId));
+            for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
+                pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
             }
         }
 
         // update the path
-        public synchronized void updatePaths(List<Long> paths) {
+        public synchronized void updatePaths(Map<Long, TStorageMedium> paths) {
             // delete non exist path
-            pathSlots.entrySet().removeIf(entry -> 
!paths.contains(entry.getKey()));
+            pathSlots.entrySet().removeIf(entry -> 
!paths.containsKey(entry.getKey()));
 
             // add new path
-            for (Long pathHash : paths) {
-                if (!pathSlots.containsKey(pathHash)) {
-                    pathSlots.put(pathHash, new Slot(beId));
+            for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
+                if (!pathSlots.containsKey(entry.getKey())) {
+                    pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
                 }
             }
         }
@@ -1829,6 +1884,20 @@ public class TabletScheduler extends MasterDaemon {
             return true;
         }
 
+        public synchronized boolean hasAvailableBalanceSlot(long pathHash) {
+            if (pathHash == -1) {
+                return false;
+            }
+            Slot slot = pathSlots.get(pathHash);
+            if (slot == null) {
+                return false;
+            }
+            if (slot.getAvailableBalance() == 0) {
+                return false;
+            }
+            return true;
+        }
+
         /**
          * If the specified 'pathHash' has available slot, decrease the slot 
number and return this path hash
          */
@@ -1872,27 +1941,27 @@ public class TabletScheduler extends MasterDaemon {
             return total;
         }
 
+        public synchronized int getTotalAvailBalanceSlotNum() {
+            int num = 0;
+            for (Slot slot : pathSlots.values()) {
+                num += slot.getAvailableBalance();
+            }
+            return num;
+        }
+
         /**
          * get path whose balance slot num is larger than 0
          */
         public synchronized Set<Long> getAvailPathsForBalance() {
             Set<Long> pathHashs = Sets.newHashSet();
             for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
-                if (entry.getValue().getBalanceAvailable() > 0) {
+                if (entry.getValue().getAvailableBalance() > 0) {
                     pathHashs.add(entry.getKey());
                 }
             }
             return pathHashs;
         }
 
-        public synchronized int getAvailBalanceSlotNum() {
-            int num = 0;
-            for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
-                num += entry.getValue().getBalanceAvailable();
-            }
-            return num;
-        }
-
         public synchronized List<List<String>> getSlotInfo(long beId) {
             List<List<String>> results = Lists.newArrayList();
             pathSlots.forEach((key, value) -> {
@@ -1901,13 +1970,18 @@ public class TabletScheduler extends MasterDaemon {
                 result.add(String.valueOf(key));
                 result.add(String.valueOf(value.getAvailable()));
                 result.add(String.valueOf(value.getTotal()));
-                result.add(String.valueOf(value.getBalanceAvailable()));
+                result.add(String.valueOf(value.getAvailableBalance()));
                 result.add(String.valueOf(value.getAvgRate()));
                 results.add(result);
             });
             return results;
         }
 
+        public synchronized int getAvailableBalanceNum(long pathHash) {
+            Slot slot = pathSlots.get(pathHash);
+            return slot != null ? slot.getAvailableBalance() : 0;
+        }
+
         public synchronized long takeBalanceSlot(long pathHash) {
             Slot slot = pathSlots.get(pathHash);
             if (slot == null) {
@@ -1980,10 +2054,10 @@ public class TabletScheduler extends MasterDaemon {
         // for disk balance
         public long diskBalanceLastSuccTime = 0;
 
-        private long beId;
+        private TStorageMedium storageMedium;
 
-        public Slot(long beId) {
-            this.beId = beId;
+        public Slot(TStorageMedium storageMedium) {
+            this.storageMedium = storageMedium;
             this.used = 0;
             this.balanceUsed = 0;
         }
@@ -1993,18 +2067,16 @@ public class TabletScheduler extends MasterDaemon {
         }
 
         public int getTotal() {
-            int total = Math.max(1, Config.schedule_slot_num_per_path);
-
-            Backend be = Env.getCurrentSystemInfo().getBackend(beId);
-            if (be != null && be.isDecommissioned()) {
-                total = Math.max(1, 
Config.schedule_decommission_slot_num_per_path);
+            if (storageMedium == TStorageMedium.SSD) {
+                return Config.schedule_slot_num_per_ssd_path;
+            } else {
+                return Config.schedule_slot_num_per_hdd_path;
             }
-
-            return total;
         }
 
-        public int getBalanceAvailable() {
-            return Math.max(0, getBalanceTotal() - balanceUsed);
+        public int getAvailableBalance() {
+            int leftBalance = Math.max(0, getBalanceTotal() - balanceUsed);
+            return Math.min(leftBalance, getAvailable());
         }
 
         public int getBalanceTotal() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 59f3efa32f..789315c33f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -67,6 +67,8 @@ public class FeConstants {
     public static String null_string = "\\N";
 
     public static long tablet_checker_interval_ms = 20 * 1000L;
+    public static long tablet_schedule_interval_ms = 100L;
+
     public static String csv = "csv";
     public static String csv_with_names = "csv_with_names";
     public static String csv_with_names_and_types = "csv_with_names_and_types";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
index 4441a99431..67e24870f9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
@@ -37,7 +37,7 @@ public class TabletSchedulerDetailProcDir implements 
ProcDirInterface {
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>().add("TabletId")
             
.add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe")
             
.add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit")
-            
.add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
+            
.add("Finished").add("ReplicaSize").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
             .add("CmtVer").add("ErrMsg")
             .build();
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java
index 019fded640..60531ced30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java
@@ -33,6 +33,7 @@ public class CloneTask extends AgentTask {
     private long replicaId;
     private List<TBackend> srcBackends;
     private TStorageMedium storageMedium;
+    private TBackend destBackend;
 
     private long visibleVersion;
 
@@ -43,10 +44,11 @@ public class CloneTask extends AgentTask {
 
     private int taskVersion = VERSION_1;
 
-    public CloneTask(long backendId, long dbId, long tableId, long 
partitionId, long indexId, long tabletId,
-            long replicaId, int schemaHash, List<TBackend> srcBackends, 
TStorageMedium storageMedium,
-            long visibleVersion, int timeoutS) {
+    public CloneTask(TBackend destBackend, long backendId, long dbId, long 
tableId, long partitionId,
+            long indexId, long tabletId, long replicaId, int schemaHash, 
List<TBackend> srcBackends,
+            TStorageMedium storageMedium, long visibleVersion, int timeoutS) {
         super(null, backendId, TTaskType.CLONE, dbId, tableId, partitionId, 
indexId, tabletId);
+        this.destBackend = destBackend;
         this.replicaId = replicaId;
         this.schemaHash = schemaHash;
         this.srcBackends = srcBackends;
@@ -95,15 +97,16 @@ public class CloneTask extends AgentTask {
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("tablet id: ").append(tabletId).append(", replica id: 
").append(replicaId).append(", schema hash: ")
-                .append(schemaHash);
+        sb.append("tablet id: ").append(tabletId)
+                .append(", replica id: ").append(replicaId)
+                .append(", schema hash: ").append(schemaHash);
         sb.append(", storageMedium: ").append(storageMedium.name());
         sb.append(", visible version: ").append(visibleVersion);
         sb.append(", src backend: ").append(srcBackends.get(0).getHost())
                 .append(", src path hash: ").append(srcPathHash);
-        sb.append(", src backend: 
").append(srcBackends.get(0).getHost()).append(", src path hash: ")
-                .append(srcPathHash);
-        sb.append(", dest backend: ").append(backendId).append(", dest path 
hash: ").append(destPathHash);
+        sb.append(", dest backend id: ").append(backendId)
+                .append(", dest backend: ").append(destBackend.getHost())
+                .append(", dest path hash: ").append(destPathHash);
         return sb.toString();
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
new file mode 100644
index 0000000000..43ea5340bc
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.analysis.AlterSystemStmt;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ExceptionChecker;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TDisk;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.utframe.UtFrameUtils;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+public class DecommissionTest {
+    private static final Logger LOG = 
LogManager.getLogger(TabletReplicaTooSlowTest.class);
+    // use a unique dir so that it won't be conflict with other unit test which
+    // may also start a Mocked Frontend
+    private static String runningDirBase = "fe";
+    private static String runningDir = runningDirBase + 
"/mocked/DecommissionTest/" + UUID.randomUUID() + "/";
+    private static ConnectContext connectContext;
+
+    private static Random random = new Random(System.currentTimeMillis());
+
+    private long id = 10086;
+
+    private final SystemInfoService systemInfoService = new 
SystemInfoService();
+    private final TabletInvertedIndex invertedIndex = new 
TabletInvertedIndex();
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        FeConstants.runningUnitTest = true;
+        System.out.println(runningDir);
+        FeConstants.runningUnitTest = true;
+        FeConstants.tablet_checker_interval_ms = 200;
+        FeConstants.tablet_schedule_interval_ms = 2000;
+        Config.tablet_repair_delay_factor_second = 1;
+        Config.enable_round_robin_create_tablet = true;
+        Config.schedule_slot_num_per_hdd_path = 10000;
+        Config.max_scheduling_tablets = 10000;
+        Config.schedule_batch_size = 10000;
+        Config.disable_balance = true;
+        // 4 backends:
+        // 127.0.0.1
+        // 127.0.0.2
+        // 127.0.0.3
+        // 127.0.0.4
+        UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 4);
+        List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
+        for (Backend be : backends) {
+            Map<String, TDisk> backendDisks = Maps.newHashMap();
+            TDisk tDisk1 = new TDisk();
+            tDisk1.setRootPath("/home/doris1.HDD");
+            tDisk1.setDiskTotalCapacity(20000000);
+            tDisk1.setDataUsedCapacity(1);
+            tDisk1.setUsed(true);
+            tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity - 
tDisk1.data_used_capacity);
+            tDisk1.setPathHash(random.nextLong());
+            tDisk1.setStorageMedium(TStorageMedium.HDD);
+            backendDisks.put(tDisk1.getRootPath(), tDisk1);
+
+            TDisk tDisk2 = new TDisk();
+            tDisk2.setRootPath("/home/doris2.HHD");
+            tDisk2.setDiskTotalCapacity(20000000);
+            tDisk2.setDataUsedCapacity(1);
+            tDisk2.setUsed(true);
+            tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity - 
tDisk2.data_used_capacity);
+            tDisk2.setPathHash(random.nextLong());
+            tDisk2.setStorageMedium(TStorageMedium.HDD);
+            backendDisks.put(tDisk2.getRootPath(), tDisk2);
+
+            be.updateDisks(backendDisks);
+        }
+
+        connectContext = UtFrameUtils.createDefaultCtx();
+
+        // create database
+        String createDbStmtStr = "create database test;";
+        CreateDbStmt createDbStmt = (CreateDbStmt) 
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
+        Env.getCurrentEnv().createDb(createDbStmt);
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        //UtFrameUtils.cleanDorisFeDir(runningDirBase);
+    }
+
+    private static void createTable(String sql) throws Exception {
+        CreateTableStmt createTableStmt = (CreateTableStmt) 
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+        Env.getCurrentEnv().createTable(createTableStmt);
+        RebalancerTestUtil.updateReplicaPathHash();
+    }
+
+    @Test
+    public void testDecommissionBackend() throws Exception {
+        // test colocate tablet repair
+        String createStr = "create table test.tbl1\n"
+                + "(k1 date, k2 int)\n"
+                + "distributed by hash(k2) buckets 2400\n"
+                + "properties\n"
+                + "(\n"
+                + "    \"replication_num\" = \"1\"\n"
+                + ")";
+        ExceptionChecker.expectThrowsNoException(() -> createTable(createStr));
+        int totalReplicaNum = 1 * 2400;
+        checkBalance(1, totalReplicaNum, 4);
+
+        Backend backend = Env.getCurrentSystemInfo().getAllBackends().get(0);
+        String decommissionStmtStr = "alter system decommission backend \"" + 
backend.getHost()
+                + ":" + backend.getHeartbeatPort() + "\"";
+        AlterSystemStmt decommissionStmt =
+                (AlterSystemStmt) 
UtFrameUtils.parseAndAnalyzeStmt(decommissionStmtStr, connectContext);
+        
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
+
+        Assert.assertEquals(true, backend.isDecommissioned());
+
+        checkBalance(200, totalReplicaNum, 3);
+    }
+
+    void checkBalance(int tryTimes, int totalReplicaNum, int backendNum) 
throws Exception {
+        int beReplicaNum = totalReplicaNum / backendNum;
+        for (int i = 0; i < tryTimes; i++) {
+            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
+            if (backendNum != backendIds.size() && i != tryTimes - 1) {
+                Thread.sleep(1000);
+                continue;
+            }
+
+            List<Integer> tabletNums = Lists.newArrayList();
+            for (long beId : backendIds) {
+                
tabletNums.add(Env.getCurrentInvertedIndex().getTabletNumByBackendId(beId));
+            }
+
+            Assert.assertEquals("tablet nums = " + tabletNums, backendNum, 
backendIds.size());
+            for (int tabletNum : tabletNums) {
+                Assert.assertEquals("tablet nums = " + tabletNums, 
beReplicaNum, tabletNum);
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
index 9d5ffe1e6d..457466d72a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
@@ -20,7 +20,6 @@ package org.apache.doris.clone;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DataProperty;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.KeysType;
@@ -36,7 +35,6 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.resource.Tag;
-import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.StorageMediaMigrationTask;
@@ -60,7 +58,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
 public class DiskRebalanceTest {
@@ -79,10 +76,12 @@ public class DiskRebalanceTest {
     private final SystemInfoService systemInfoService = new 
SystemInfoService();
     private final TabletInvertedIndex invertedIndex = new 
TabletInvertedIndex();
     private Map<Tag, LoadStatisticForTag> statisticMap;
+    private Map<Long, PathSlot> backendsWorkingSlots = Maps.newHashMap();
 
     @Before
     public void setUp() throws Exception {
         Config.used_capacity_percent_max_diff = 1.0;
+        Config.balance_slot_num_per_path = 1;
         db = new Database(1, "test db");
         db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
         new Expectations() {
@@ -137,12 +136,19 @@ public class DiskRebalanceTest {
                 
Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1, 2, 
Lists.newArrayList(3L)));
     }
 
-    private void generateStatisticMap() {
+    private void generateStatisticsAndPathSlots() {
         LoadStatisticForTag loadStatistic = new 
LoadStatisticForTag(Tag.DEFAULT_BACKEND_TAG, systemInfoService,
                 invertedIndex);
         loadStatistic.init();
         statisticMap = Maps.newHashMap();
         statisticMap.put(Tag.DEFAULT_BACKEND_TAG, loadStatistic);
+        backendsWorkingSlots.clear();
+        for (BackendLoadStatistic beStat : 
loadStatistic.getSortedBeLoadStats(null)) {
+            Map<Long, TStorageMedium> paths = Maps.newHashMap();
+            beStat.getPathStatistics().stream().forEach(
+                    path -> paths.put(path.getPathHash(), 
path.getStorageMedium()));
+            backendsWorkingSlots.put(beStat.getBeId(), new PathSlot(paths, 
beStat.getBeId()));
+        }
     }
 
     private void createPartitionsForTable(OlapTable olapTable, 
MaterializedIndex index, Long partitionCount) {
@@ -187,8 +193,9 @@ public class DiskRebalanceTest {
         // case start
         Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", 
Level.DEBUG);
 
-        Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), 
Env.getCurrentInvertedIndex());
-        generateStatisticMap();
+        generateStatisticsAndPathSlots();
+        Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), 
Env.getCurrentInvertedIndex(),
+                backendsWorkingSlots);
         rebalancer.updateLoadStatistic(statisticMap);
         List<TabletSchedCtx> alternativeTablets = 
rebalancer.selectAlternativeTablets();
         // check alternativeTablets;
@@ -229,8 +236,9 @@ public class DiskRebalanceTest {
         // case start
         Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", 
Level.DEBUG);
 
-        Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), 
Env.getCurrentInvertedIndex());
-        generateStatisticMap();
+        generateStatisticsAndPathSlots();
+        Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), 
Env.getCurrentInvertedIndex(),
+                backendsWorkingSlots);
         rebalancer.updateLoadStatistic(statisticMap);
         for (Map.Entry<Tag, LoadStatisticForTag> s : statisticMap.entrySet()) {
             if (s.getValue() != null) {
@@ -240,16 +248,6 @@ public class DiskRebalanceTest {
         List<TabletSchedCtx> alternativeTablets = 
rebalancer.selectAlternativeTablets();
         // check alternativeTablets;
         Assert.assertEquals(2, alternativeTablets.size());
-        Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap();
-        for (Backend be : Env.getCurrentSystemInfo().getAllBackends()) {
-            if (!backendsWorkingSlots.containsKey(be.getId())) {
-                List<Long> pathHashes = 
be.getDisks().values().stream().map(DiskInfo::getPathHash)
-                        .collect(Collectors.toList());
-                PathSlot slot = new PathSlot(pathHashes, 
Config.schedule_slot_num_per_path);
-                backendsWorkingSlots.put(be.getId(), slot);
-            }
-        }
-
         for (TabletSchedCtx tabletCtx : alternativeTablets) {
             LOG.info("try to schedule tablet {}", tabletCtx.getTabletId());
             try {
@@ -259,7 +257,7 @@ public class DiskRebalanceTest {
                 
tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId()));
                 tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // 
rebalance tablet should be healthy first
 
-                AgentTask task = rebalancer.createBalanceTask(tabletCtx, 
backendsWorkingSlots);
+                AgentTask task = rebalancer.createBalanceTask(tabletCtx);
                 if (tabletCtx.getTabletSize() == 0) {
                     Assert.fail("no exception");
                 } else {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
index c36ef531c2..fe47338398 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
@@ -196,7 +196,7 @@ public class RebalanceTest {
 
     @Test
     public void testPrioBackends() {
-        Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), 
Env.getCurrentInvertedIndex());
+        Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), 
Env.getCurrentInvertedIndex(), null);
         // add
         { // CHECKSTYLE IGNORE THIS LINE
             List<Backend> backends = Lists.newArrayList();
@@ -232,7 +232,7 @@ public class RebalanceTest {
         // Call runAfterCatalogReady manually instead of starting daemon thread
         TabletSchedulerStat stat = new TabletSchedulerStat();
         PartitionRebalancer rebalancer = new 
PartitionRebalancer(Env.getCurrentSystemInfo(),
-                Env.getCurrentInvertedIndex());
+                Env.getCurrentInvertedIndex(), null);
         TabletScheduler tabletScheduler = new TabletScheduler(env, 
systemInfoService, invertedIndex, stat, "");
         // The rebalancer inside the scheduler will use this rebalancer, for 
getToDeleteReplicaId
         Deencapsulation.setField(tabletScheduler, "rebalancer", rebalancer);
@@ -256,7 +256,7 @@ public class RebalanceTest {
                 tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // 
rebalance tablet should be healthy first
 
                 // createCloneReplicaAndTask, create replica will change 
invertedIndex too.
-                AgentTask task = rebalancer.createBalanceTask(tabletCtx, 
tabletScheduler.getBackendsWorkingSlots());
+                AgentTask task = rebalancer.createBalanceTask(tabletCtx);
                 batchTask.addTask(task);
             } catch (SchedException e) {
                 LOG.warn("schedule tablet {} failed: {}", 
tabletCtx.getTabletId(), e.getMessage());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 7d43a5fb77..95f71d0b51 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.doris.clone;
 
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
@@ -32,6 +33,7 @@ import org.apache.doris.thrift.TStorageMedium;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
 
 import java.util.List;
 import java.util.Map;
@@ -106,4 +108,25 @@ public class RebalancerTestUtil {
             invertedIndex.addReplica(tablet.getId(), replica);
         });
     }
+
+    public static void updateReplicaPathHash() {
+        Table<Long, Long, Replica> replicaMetaTable = 
Env.getCurrentInvertedIndex().getReplicaMetaTable();
+        for (Table.Cell<Long, Long, Replica> cell : 
replicaMetaTable.cellSet()) {
+            long beId = cell.getColumnKey();
+            Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+            if (be == null) {
+                continue;
+            }
+            Replica replica = cell.getValue();
+            TabletMeta tabletMeta = 
Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
+            ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
+            for (DiskInfo diskInfo : diskMap.values()) {
+                if (diskInfo.getStorageMedium() == 
tabletMeta.getStorageMedium()) {
+                    replica.setPathHash(diskInfo.getPathHash());
+                    break;
+                }
+            }
+        }
+    }
+
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java
index a0d8dd94c0..efb22d333a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java
@@ -31,18 +31,22 @@ public class RootPathLoadStatisticTest {
 
     @Test
     public void test() {
-        RootPathLoadStatistic usageLow = new RootPathLoadStatistic(0L, 
"/home/disk1", 12345L, TStorageMedium.HDD, 4096L,
+        RootPathLoadStatistic usage1 = new RootPathLoadStatistic(0L, 
"/home/disk1", 12345L, TStorageMedium.HDD, 4096L,
                 1024L, DiskState.ONLINE);
-        RootPathLoadStatistic usageHigh = new RootPathLoadStatistic(0L, 
"/home/disk2", 67890L, TStorageMedium.HDD,
+        RootPathLoadStatistic usage2 = new RootPathLoadStatistic(0L, 
"/home/disk2", 67890L, TStorageMedium.HDD,
                 4096L, 2048L, DiskState.ONLINE);
 
         List<RootPathLoadStatistic> list = Lists.newArrayList();
-        list.add(usageLow);
-        list.add(usageHigh);
+        list.add(usage1);
+        list.add(usage2);
 
         // low usage should be ahead
         Collections.sort(list);
-        Assert.assertTrue(list.get(0).getPathHash() == usageLow.getPathHash());
+        Assert.assertTrue(list.get(0).getPathHash() == usage1.getPathHash());
+
+        usage1.incrCopingSizeB(2048L);
+        Collections.sort(list);
+        Assert.assertTrue(list.get(1).getPathHash() == usage1.getPathHash());
     }
 
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
index 5c58954952..0e9f1446c7 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
@@ -26,7 +26,6 @@ import org.apache.doris.analysis.DropTableStmt;
 import org.apache.doris.catalog.ColocateGroupSchema;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
@@ -36,7 +35,6 @@ import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
-import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -54,7 +52,6 @@ import org.apache.doris.thrift.TDisk;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.utframe.UtFrameUtils;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Table;
@@ -162,7 +159,7 @@ public class TabletRepairAndBalanceTest {
         CreateTableStmt createTableStmt = (CreateTableStmt) 
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
         Env.getCurrentEnv().createTable(createTableStmt);
         // must set replicas' path hash, or the tablet scheduler won't work
-        updateReplicaPathHash();
+        RebalancerTestUtil.updateReplicaPathHash();
     }
 
     private static void dropTable(String sql) throws Exception {
@@ -170,26 +167,6 @@ public class TabletRepairAndBalanceTest {
         Env.getCurrentEnv().dropTable(dropTableStmt);
     }
 
-    private static void updateReplicaPathHash() {
-        Table<Long, Long, Replica> replicaMetaTable = 
Env.getCurrentInvertedIndex().getReplicaMetaTable();
-        for (Table.Cell<Long, Long, Replica> cell : 
replicaMetaTable.cellSet()) {
-            long beId = cell.getColumnKey();
-            Backend be = Env.getCurrentSystemInfo().getBackend(beId);
-            if (be == null) {
-                continue;
-            }
-            Replica replica = cell.getValue();
-            TabletMeta tabletMeta = 
Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
-            ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
-            for (DiskInfo diskInfo : diskMap.values()) {
-                if (diskInfo.getStorageMedium() == 
tabletMeta.getStorageMedium()) {
-                    replica.setPathHash(diskInfo.getPathHash());
-                    break;
-                }
-            }
-        }
-    }
-
     private static void alterTable(String sql) throws Exception {
         AlterTableStmt alterTableStmt = (AlterTableStmt) 
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
         
Env.getCurrentEnv().getAlterInstance().processAlterTable(alterTableStmt);
@@ -498,7 +475,7 @@ public class TabletRepairAndBalanceTest {
         ExceptionChecker.expectThrowsNoException(() -> 
createTable(createStr6));
 
         OlapTable tbl3 = db.getOlapTableOrDdlException("col_tbl3");
-        updateReplicaPathHash();
+        RebalancerTestUtil.updateReplicaPathHash();
         // Set one replica's state as DECOMMISSION, see if it can be changed 
to NORMAL
         Tablet oneTablet = null;
         Replica oneReplica = null;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 86d3fda79f..b0b862e3dd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -114,7 +114,8 @@ public class AgentTaskTest {
 
         // clone
         cloneTask =
-                new CloneTask(backendId1, dbId, tableId, partitionId, 
indexId1, tabletId1, replicaId1, schemaHash1,
+                new CloneTask(new TBackend("host2", 8290, 8390), backendId1, 
dbId, tableId, partitionId,
+                        indexId1, tabletId1, replicaId1, schemaHash1,
                         Arrays.asList(new TBackend("host1", 8290, 8390)), 
TStorageMedium.HDD, -1, 3600);
 
         // storageMediaMigrationTask


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to