This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 04b547967bbcc73c3fdebaeba4c5e0b6f7177aa8
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Thu Oct 19 09:33:42 2023 +0800

    [improvement](disk balance) impr disk rebalancer sched with partition 
rebalancer (#25549)
---
 .../main/java/org/apache/doris/clone/DiskRebalancer.java   | 14 +++++++++++---
 .../java/org/apache/doris/clone/PartitionRebalancer.java   | 12 ++++++++++++
 .../main/java/org/apache/doris/clone/TabletScheduler.java  |  4 ++++
 3 files changed, 27 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index e1bb690696c..63554e17b12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -28,6 +28,7 @@ import org.apache.doris.clone.SchedException.Status;
 import org.apache.doris.clone.TabletSchedCtx.BalanceType;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.clone.TabletScheduler.PathSlot;
+import org.apache.doris.common.Config;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TStorageMedium;
 
@@ -125,7 +126,14 @@ public class DiskRebalancer extends Rebalancer {
         List<BackendLoadStatistic> highBEs = Lists.newArrayList();
         clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, 
medium);
 
-        if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
+        if (Config.tablet_rebalancer_type.equalsIgnoreCase("partition")) {
+            PartitionRebalancer rebalancer = (PartitionRebalancer) 
Env.getCurrentEnv()
+                    .getTabletScheduler().getRebalancer();
+            if (rebalancer != null && rebalancer.checkCacheEmptyForLong()) {
+                midBEs.addAll(lowBEs);
+                midBEs.addAll(highBEs);
+            }
+        } else if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
             // the cluster is not balanced
             if (prioBackends.isEmpty()) {
                 LOG.info("cluster is not balanced with medium: {}. skip", 
medium);
@@ -142,13 +150,13 @@ public class DiskRebalancer extends Rebalancer {
         // if all mid backends is not available, we should not start balance
         if (midBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
             LOG.debug("all mid load backends is dead: {} with medium: {}. 
skip",
-                    
lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
+                    
midBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
             return alternativeTablets;
         }
 
         if (midBEs.stream().noneMatch(BackendLoadStatistic::hasAvailDisk)) {
             LOG.info("all mid load backends {} have no available disk with 
medium: {}. skip",
-                    
lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
+                    
midBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
             return alternativeTablets;
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index 6c83944462e..8dcf6fd5dbe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -64,6 +64,8 @@ public class PartitionRebalancer extends Rebalancer {
     private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
     private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
 
+    private long cacheEmptyTimestamp = -1L;
+
     public PartitionRebalancer(SystemInfoService infoService, 
TabletInvertedIndex invertedIndex,
             Map<Long, PathSlot> backendsWorkingSlots) {
         super(infoService, invertedIndex, backendsWorkingSlots);
@@ -230,6 +232,11 @@ public class PartitionRebalancer extends Rebalancer {
         return !bes.contains(move.fromBe) && bes.contains(move.toBe);
     }
 
+    // cache empty for 10 min
+    public boolean checkCacheEmptyForLong() {
+        return cacheEmptyTimestamp > 0 && System.currentTimeMillis() > 
cacheEmptyTimestamp + 10 * 60 * 1000L;
+    }
+
     @Override
     protected void completeSchedCtx(TabletSchedCtx tabletCtx)
             throws SchedException {
@@ -317,6 +324,11 @@ public class PartitionRebalancer extends Rebalancer {
         movesCacheMap.updateMapping(statisticMap, 
Config.partition_rebalance_move_expire_after_access);
         // Perform cache maintenance
         movesCacheMap.maintain();
+        if (movesCacheMap.size() > 0) {
+            cacheEmptyTimestamp = -1;
+        } else if (cacheEmptyTimestamp < 0) {
+            cacheEmptyTimestamp = System.currentTimeMillis();
+        }
         LOG.debug("Move succeeded/total :{}/{}, current {}",
                 counterBalanceMoveSucceeded.get(), 
counterBalanceMoveCreated.get(), movesCacheMap);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index ee9da3ac100..d89129bfddb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -170,6 +170,10 @@ public class TabletScheduler extends MasterDaemon {
         return stat;
     }
 
+    public Rebalancer getRebalancer() {
+        return rebalancer;
+    }
+
     /*
      * update working slots at the beginning of each round
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to