This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 04b547967bbcc73c3fdebaeba4c5e0b6f7177aa8 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Thu Oct 19 09:33:42 2023 +0800 [improvement](disk balance) impr disk rebalancer sched with partition rebalancer (#25549) --- .../main/java/org/apache/doris/clone/DiskRebalancer.java | 14 +++++++++++--- .../java/org/apache/doris/clone/PartitionRebalancer.java | 12 ++++++++++++ .../main/java/org/apache/doris/clone/TabletScheduler.java | 4 ++++ 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java index e1bb690696c..63554e17b12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -28,6 +28,7 @@ import org.apache.doris.clone.SchedException.Status; import org.apache.doris.clone.TabletSchedCtx.BalanceType; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; +import org.apache.doris.common.Config; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -125,7 +126,14 @@ public class DiskRebalancer extends Rebalancer { List<BackendLoadStatistic> highBEs = Lists.newArrayList(); clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium); - if (!(lowBEs.isEmpty() && highBEs.isEmpty())) { + if (Config.tablet_rebalancer_type.equalsIgnoreCase("partition")) { + PartitionRebalancer rebalancer = (PartitionRebalancer) Env.getCurrentEnv() + .getTabletScheduler().getRebalancer(); + if (rebalancer != null && rebalancer.checkCacheEmptyForLong()) { + midBEs.addAll(lowBEs); + midBEs.addAll(highBEs); + } + } else if (!(lowBEs.isEmpty() && highBEs.isEmpty())) { // the cluster is not balanced if (prioBackends.isEmpty()) { LOG.info("cluster is not balanced with medium: {}. skip", medium); @@ -142,13 +150,13 @@ public class DiskRebalancer extends Rebalancer { // if all mid backends is not available, we should not start balance if (midBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) { LOG.debug("all mid load backends is dead: {} with medium: {}. skip", - lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium); + midBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium); return alternativeTablets; } if (midBEs.stream().noneMatch(BackendLoadStatistic::hasAvailDisk)) { LOG.info("all mid load backends {} have no available disk with medium: {}. skip", - lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium); + midBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium); return alternativeTablets; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index 6c83944462e..8dcf6fd5dbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -64,6 +64,8 @@ public class PartitionRebalancer extends Rebalancer { private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0); private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0); + private long cacheEmptyTimestamp = -1L; + public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex, Map<Long, PathSlot> backendsWorkingSlots) { super(infoService, invertedIndex, backendsWorkingSlots); @@ -230,6 +232,11 @@ public class PartitionRebalancer extends Rebalancer { return !bes.contains(move.fromBe) && bes.contains(move.toBe); } + // cache empty for 10 min + public boolean checkCacheEmptyForLong() { + return cacheEmptyTimestamp > 0 && System.currentTimeMillis() > cacheEmptyTimestamp + 10 * 60 * 1000L; + } + @Override protected void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { @@ -317,6 +324,11 @@ public class PartitionRebalancer extends Rebalancer { movesCacheMap.updateMapping(statisticMap, Config.partition_rebalance_move_expire_after_access); // Perform cache maintenance movesCacheMap.maintain(); + if (movesCacheMap.size() > 0) { + cacheEmptyTimestamp = -1; + } else if (cacheEmptyTimestamp < 0) { + cacheEmptyTimestamp = System.currentTimeMillis(); + } LOG.debug("Move succeeded/total :{}/{}, current {}", counterBalanceMoveSucceeded.get(), counterBalanceMoveCreated.get(), movesCacheMap); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index ee9da3ac100..d89129bfddb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -170,6 +170,10 @@ public class TabletScheduler extends MasterDaemon { return stat; } + public Rebalancer getRebalancer() { + return rebalancer; + } + /* * update working slots at the beginning of each round */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org