This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit e58c6d0e872cede53d060774ec58fb7859184204 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Wed Feb 7 07:02:41 2024 +0800 [improvement](balance) fix multiple problems for balance on large cluster (#30713) --- .../main/java/org/apache/doris/common/Config.java | 31 +++- .../main/java/org/apache/doris/common/Pair.java | 2 +- .../apache/doris/catalog/TabletInvertedIndex.java | 19 ++- .../apache/doris/clone/BackendLoadStatistic.java | 68 ++++++-- .../org/apache/doris/clone/BeLoadRebalancer.java | 190 +++++++++++++++------ .../apache/doris/clone/LoadStatisticForTag.java | 166 +++++++++++++++++- .../apache/doris/clone/RootPathLoadStatistic.java | 30 +++- .../org/apache/doris/clone/TabletScheduler.java | 96 ++++++++--- .../common/proc/BackendLoadStatisticProcNode.java | 2 +- .../common/proc/ClusterLoadStatisticProcDir.java | 2 +- .../doris/clone/ClusterLoadStatisticsTest.java | 72 +++++--- 11 files changed, 544 insertions(+), 134 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index a7f642944ed..5a239a8c407 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1014,6 +1014,30 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static double balance_load_score_threshold = 0.1; // 10% + // if disk usage > balance_load_score_threshold + urgent_disk_usage_extra_threshold + // then this disk need schedule quickly + // this value could less than 0. + @ConfField(mutable = true, masterOnly = true) + public static double urgent_balance_disk_usage_extra_threshold = 0.05; + + // when run urgent disk balance, shuffle the top large tablets + // range: [ 0 ~ 100 ] + @ConfField(mutable = true, masterOnly = true) + public static int urgent_balance_shuffle_large_tablet_percentage = 1; + + @ConfField(mutable = true, masterOnly = true) + public static double urgent_balance_pick_large_tablet_num_threshold = 1000; + + // range: 0 ~ 100 + @ConfField(mutable = true, masterOnly = true) + public static int urgent_balance_pick_large_disk_usage_percentage = 80; + + // there's a case, all backend has a high disk, by default, it will not run urgent disk balance. + // if set this value to true, urgent disk balance will always run, + // the backends will exchange tablets among themselves. + @ConfField(mutable = true, masterOnly = true) + public static boolean enable_urgent_balance_no_low_backend = true; + /** * if set to true, TabletScheduler will not do balance. */ @@ -1024,7 +1048,7 @@ public class Config extends ConfigBase { * when be rebalancer idle, then disk balance will occurs. */ @ConfField(mutable = true, masterOnly = true) - public static int be_rebalancer_idle_seconds = 60; + public static int be_rebalancer_idle_seconds = 0; /** * if set to true, TabletScheduler will not do disk balance. @@ -1032,6 +1056,11 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static boolean disable_disk_balance = false; + // balance order + // ATTN: a temporary config, may delete later. + @ConfField(mutable = true, masterOnly = true) + public static boolean balance_be_then_disk = true; + /** * if set to false, TabletScheduler will not do disk balance for replica num = 1. */ diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java b/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java index c3ae810582c..7d1736a0754 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Pair.java @@ -78,7 +78,7 @@ public class Pair<F, S> { return first.toString() + ":" + second.toString(); } - public static class PairComparator<T extends Pair<?, Comparable>> implements Comparator<T> { + public static class PairComparator<T extends Pair<?, ? extends Comparable>> implements Comparator<T> { @Override public int compare(T o1, T o2) { return o1.second.compareTo(o2.second); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index d2ea70873f7..8137dddae13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -644,19 +644,28 @@ public class TabletInvertedIndex { return tabletIds; } - public List<Long> getTabletIdsByBackendIdAndStorageMedium(long backendId, TStorageMedium storageMedium) { - List<Long> tabletIds = Lists.newArrayList(); + public List<Pair<Long, Long>> getTabletSizeByBackendIdAndStorageMedium(long backendId, + TStorageMedium storageMedium) { + List<Pair<Long, Long>> tabletIdSizes = Lists.newArrayList(); long stamp = readLock(); try { Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); if (replicaMetaWithBackend != null) { - tabletIds = replicaMetaWithBackend.keySet().stream().filter( - id -> tabletMetaMap.get(id).getStorageMedium() == storageMedium).collect(Collectors.toList()); + tabletIdSizes = replicaMetaWithBackend.entrySet().stream() + .filter(entry -> tabletMetaMap.get(entry.getKey()).getStorageMedium() == storageMedium) + .map(entry -> Pair.of(entry.getKey(), entry.getValue().getDataSize())) + .collect(Collectors.toList()); } } finally { readUnlock(stamp); } - return tabletIds; + return tabletIdSizes; + } + + public List<Long> getTabletIdsByBackendIdAndStorageMedium(long backendId, + TStorageMedium storageMedium) { + return getTabletSizeByBackendIdAndStorageMedium(backendId, storageMedium).stream() + .map(Pair::key).collect(Collectors.toList()); } public int getTabletNumByBackendId(long backendId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java index 9ac6fce7f2b..f1f632c997a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java @@ -39,6 +39,8 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class BackendLoadStatistic { private static final Logger LOG = LogManager.getLogger(BackendLoadStatistic.class); @@ -166,6 +168,7 @@ public class BackendLoadStatistic { private Map<TStorageMedium, Long> totalReplicaNumMap = Maps.newHashMap(); private Map<TStorageMedium, LoadScore> loadScoreMap = Maps.newHashMap(); private Map<TStorageMedium, Classification> clazzMap = Maps.newHashMap(); + private Map<TStorageMedium, Classification> maxDiskClazzMap = Maps.newHashMap(); private List<RootPathLoadStatistic> pathStatistics = Lists.newArrayList(); public BackendLoadStatistic(long beId, Tag tag, SystemInfoService infoService, @@ -227,6 +230,14 @@ public class BackendLoadStatistic { return clazzMap.getOrDefault(medium, Classification.INIT); } + public void setMaxDiskClazz(TStorageMedium medium, Classification clazz) { + this.maxDiskClazzMap.put(medium, clazz); + } + + public Classification getMaxDiskClazz(TStorageMedium medium) { + return maxDiskClazzMap.getOrDefault(medium, Classification.INIT); + } + public void init() throws LoadBalanceException { Backend be = infoService.getBackend(beId); if (be == null) { @@ -246,9 +257,17 @@ public class BackendLoadStatistic { + (diskInfo.getTotalCapacityB() - diskInfo.getAvailableCapacityB())); } + // Doris-compose put test all backends' disks on the same physical disk. + // Make a little change here. + long usedCapacityB = diskInfo.getDiskUsedCapacityB(); + if (Config.be_rebalancer_fuzzy_test) { + usedCapacityB = Math.min(diskInfo.getTotalCapacityB(), + usedCapacityB + Math.abs(diskInfo.getPathHash()) % 10000); + } + RootPathLoadStatistic pathStatistic = new RootPathLoadStatistic(beId, diskInfo.getRootPath(), diskInfo.getPathHash(), diskInfo.getStorageMedium(), - diskInfo.getTotalCapacityB(), diskInfo.getDiskUsedCapacityB(), diskInfo.getState()); + diskInfo.getTotalCapacityB(), usedCapacityB, diskInfo.getState()); pathStatistics.add(pathStatistic); } @@ -295,14 +314,14 @@ public class BackendLoadStatistic { if (Math.abs(pathStat.getUsedPercent() - avgUsedPercent) > Math.max(avgUsedPercent * Config.balance_load_score_threshold, 0.025)) { if (pathStat.getUsedPercent() > avgUsedPercent) { - pathStat.setClazz(Classification.HIGH); + pathStat.setLocalClazz(Classification.HIGH); highCounter++; } else if (pathStat.getUsedPercent() < avgUsedPercent) { - pathStat.setClazz(Classification.LOW); + pathStat.setLocalClazz(Classification.LOW); lowCounter++; } } else { - pathStat.setClazz(Classification.MID); + pathStat.setLocalClazz(Classification.MID); midCounter++; } } @@ -422,14 +441,19 @@ public class BackendLoadStatistic { BalanceStatus bStatus = pathStatistic.isFit(tabletSize, isSupplement); if (!bStatus.ok()) { - status.addErrMsgs(bStatus.getErrMsgs()); + if (status != BalanceStatus.OK) { + status.addErrMsgs(bStatus.getErrMsgs()); + } continue; } - result.add(pathStatistic); + if (result != null) { + result.add(pathStatistic); + } + status = BalanceStatus.OK; } - return result.isEmpty() ? status : BalanceStatus.OK; + return status; } /** @@ -508,9 +532,9 @@ public class BackendLoadStatistic { continue; } - if (pathStat.getClazz() == Classification.LOW) { + if (pathStat.getLocalClazz() == Classification.LOW) { low.add(pathStat.getPathHash()); - } else if (pathStat.getClazz() == Classification.HIGH) { + } else if (pathStat.getLocalClazz() == Classification.HIGH) { high.add(pathStat.getPathHash()); } else { mid.add(pathStat.getPathHash()); @@ -529,9 +553,9 @@ public class BackendLoadStatistic { continue; } - if (pathStat.getClazz() == Classification.LOW) { + if (pathStat.getLocalClazz() == Classification.LOW) { low.add(pathStat); - } else if (pathStat.getClazz() == Classification.HIGH) { + } else if (pathStat.getLocalClazz() == Classification.HIGH) { high.add(pathStat); } else { mid.add(pathStat); @@ -569,9 +593,22 @@ public class BackendLoadStatistic { return pathStatistics; } - public long getAvailPathNum(TStorageMedium medium) { - return pathStatistics.stream().filter( - p -> p.getDiskState() == DiskState.ONLINE && p.getStorageMedium() == medium).count(); + RootPathLoadStatistic getPathStatisticByPathHash(long pathHash) { + return pathStatistics.stream().filter(pathStat -> pathStat.getPathHash() == pathHash) + .findFirst().orElse(null); + } + + public List<RootPathLoadStatistic> getAvailPaths(TStorageMedium medium) { + return getAvailPathStream(medium).collect(Collectors.toList()); + } + + public boolean hasAvailPathWithGlobalClazz(TStorageMedium medium, Classification globalClazz) { + return getAvailPathStream(medium).anyMatch(pathStat -> pathStat.getGlobalClazz() == globalClazz); + } + + private Stream<RootPathLoadStatistic> getAvailPathStream(TStorageMedium medium) { + return pathStatistics.stream() + .filter(p -> p.getDiskState() == DiskState.ONLINE && p.getStorageMedium() == medium); } public boolean hasMedium(TStorageMedium medium) { @@ -603,6 +640,7 @@ public class BackendLoadStatistic { long total = totalCapacityMap.getOrDefault(medium, 0L); info.add(String.valueOf(used)); info.add(String.valueOf(total)); + info.add(getMaxDiskClazz(medium).name()); info.add(String.valueOf(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(used * 100 / (double) total))); info.add(String.valueOf(totalReplicaNumMap.getOrDefault(medium, 0L))); @@ -610,7 +648,7 @@ public class BackendLoadStatistic { info.add(String.valueOf(loadScore.capacityCoefficient)); info.add(String.valueOf(loadScore.getReplicaNumCoefficient())); info.add(String.valueOf(loadScore.score)); - info.add(clazzMap.getOrDefault(medium, Classification.INIT).name()); + info.add(getClazz(medium).name()); return info; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index e7b6211bd79..b40d7f7a512 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -25,12 +25,14 @@ import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair; import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator; +import org.apache.doris.clone.BackendLoadStatistic.Classification; import org.apache.doris.clone.SchedException.Status; import org.apache.doris.clone.SchedException.SubCode; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -45,6 +47,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /* * BeLoadRebalancer strategy: @@ -79,12 +82,9 @@ public class BeLoadRebalancer extends Rebalancer { protected List<TabletSchedCtx> selectAlternativeTabletsForCluster( LoadStatisticForTag clusterStat, TStorageMedium medium) { List<TabletSchedCtx> alternativeTablets = Lists.newArrayList(); - - // get classification of backends List<BackendLoadStatistic> lowBEs = Lists.newArrayList(); - List<BackendLoadStatistic> midBEs = Lists.newArrayList(); List<BackendLoadStatistic> highBEs = Lists.newArrayList(); - clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium); + boolean isUrgent = clusterStat.getLowHighBEsWithIsUrgent(lowBEs, highBEs, medium); if (lowBEs.isEmpty() && highBEs.isEmpty()) { LOG.debug("cluster is balance with medium: {}. skip", medium); @@ -117,6 +117,8 @@ public class BeLoadRebalancer extends Rebalancer { } LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium); + List<String> alternativeTabletInfos = Lists.newArrayList(); + // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) // so in clone ut recycleBin need to set to null. CatalogRecycleBin recycleBin = null; @@ -125,6 +127,10 @@ public class BeLoadRebalancer extends Rebalancer { } int clusterAvailableBEnum = infoService.getAllBackendIds(true).size(); ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex(); + List<Set<Long>> lowBETablets = lowBEs.stream() + .map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId()))) + .collect(Collectors.toList()); + // choose tablets from high load backends. // BackendLoadStatistic is sorted by load score in ascend order, // so we need to traverse it from last to first @@ -136,37 +142,73 @@ public class BeLoadRebalancer extends Rebalancer { continue; } - // classify the paths. - Set<Long> pathLow = Sets.newHashSet(); - Set<Long> pathMid = Sets.newHashSet(); - Set<Long> pathHigh = Sets.newHashSet(); - beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium); - // we only select tablets from available mid and high load path - pathHigh.addAll(pathMid); - - // get all tablets on this backend, and shuffle them for random selection - List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(beStat.getBeId(), medium); - Collections.shuffle(tabletIds); + boolean choseHighDisk = isUrgent && beStat.getMaxDiskClazz(medium) == Classification.HIGH; // for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets Map<Long, Integer> remainingPaths = Maps.newHashMap(); + Set<Long> pathHigh = null; + if (choseHighDisk) { + pathHigh = beStat.getAvailPaths(medium).stream().filter(RootPathLoadStatistic::isGlobalHighUsage) + .map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet()); + } else { + // classify the paths. + pathHigh = Sets.newHashSet(); + Set<Long> pathLow = Sets.newHashSet(); + Set<Long> pathMid = Sets.newHashSet(); + beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium); + // we only select tablets from available mid and high load path + pathHigh.addAll(pathMid); + } + + double highDiskMaxUsage = 0; for (Long pathHash : pathHigh) { int availBalanceNum = pathSlot.getAvailableBalanceNum(pathHash); if (availBalanceNum > 0) { remainingPaths.put(pathHash, availBalanceNum); } + + RootPathLoadStatistic pathStat = beStat.getPathStatisticByPathHash(pathHash); + if (pathStat != null) { + highDiskMaxUsage = Math.max(highDiskMaxUsage, pathStat.getUsedPercent()); + } } + LOG.debug("high be {}, medium: {}, path high: {}, remainingPaths: {}, chose high disk: {}", + beStat.getBeId(), medium, pathHigh, remainingPaths, choseHighDisk); + if (remainingPaths.isEmpty()) { continue; } + // get all tablets on this backend, and shuffle them for random selection + List<Pair<Long, Long>> tabletIdSizes = invertedIndex.getTabletSizeByBackendIdAndStorageMedium( + beStat.getBeId(), medium); + if (!isUrgent + || tabletIdSizes.size() < Config.urgent_balance_pick_large_tablet_num_threshold + || highDiskMaxUsage < (double) Config.urgent_balance_pick_large_disk_usage_percentage / 100.0 + || Config.urgent_balance_shuffle_large_tablet_percentage >= 100 + || Config.urgent_balance_shuffle_large_tablet_percentage < 0) { + Collections.shuffle(tabletIdSizes); + } else { + Collections.sort(tabletIdSizes, new Pair.PairComparator<Pair<Long, Long>>()); + if (Config.urgent_balance_shuffle_large_tablet_percentage > 0) { + int startIndex = (int) (tabletIdSizes.size() + * (1 - (double) Config.urgent_balance_shuffle_large_tablet_percentage / 100.0)); + Collections.shuffle(tabletIdSizes.subList(startIndex, tabletIdSizes.size())); + } + } + // select tablet from shuffled tablets - for (Long tabletId : tabletIds) { + for (int j = tabletIdSizes.size() - 1; j >= 0; j--) { + long tabletId = tabletIdSizes.get(j).key(); if (clusterAvailableBEnum <= invertedIndex.getReplicasByTabletId(tabletId).size()) { continue; } + if (alternativeTablets.stream().anyMatch(tabletCtx -> tabletId == tabletCtx.getTabletId())) { + continue; + } + Replica replica = invertedIndex.getReplica(tabletId, beStat.getBeId()); if (replica == null) { continue; @@ -186,20 +228,40 @@ public class BeLoadRebalancer extends Rebalancer { continue; } + // for urgent disk, pick tablets order by size, + // then it may always pick tablets that was on the low backends. + if (!lowBETablets.isEmpty() + && lowBETablets.stream().allMatch(tablets -> tablets.contains(tabletId))) { + continue; + } + if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId())) { continue; } + boolean isFit = lowBEs.stream().anyMatch(be -> be.isFit(replica.getDataSize(), + medium, null, false) == BalanceStatus.OK); + if (!isFit) { + if (LOG.isDebugEnabled()) { + LOG.debug("tablet {} with size {} medium {} not fit in low backends", + tabletId, replica.getDataSize(), medium); + } + continue; + } + TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(), tabletMeta.getIndexId(), tabletId, null /* replica alloc is not used for balance*/, System.currentTimeMillis()); tabletCtx.setTag(clusterStat.getTag()); // balance task's priority is always LOW - tabletCtx.setPriority(Priority.LOW); + tabletCtx.setPriority(isUrgent ? Priority.NORMAL : Priority.LOW); alternativeTablets.add(tabletCtx); + alternativeTabletInfos.add("{ tabletId=" + tabletId + ", beId=" + beStat.getBeId() + + ", pathHash=" + replica.getPathHash() + + ", replicaLocalSize=" + replica.getDataSize() + " }"); if (--numOfLowPaths <= 0) { // enough break OUTER; @@ -217,13 +279,13 @@ public class BeLoadRebalancer extends Rebalancer { } // end for high backends if (!alternativeTablets.isEmpty()) { - LOG.info("select alternative tablets, medium: {}, num: {}, detail: {}", - medium, alternativeTablets.size(), - alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray()); + LOG.info("select alternative tablets, medium: {}, is urgent: {}, num: {}, detail: {}", + medium, isUrgent, alternativeTablets.size(), alternativeTabletInfos); } return alternativeTablets; } + /* * Create a clone task of this selected tablet for balance. * 1. Check if this tablet has replica on high load backend. If not, the balance will be cancelled. @@ -239,17 +301,17 @@ public class BeLoadRebalancer extends Rebalancer { } // get classification of backends - List<BackendLoadStatistic> lowBe = Lists.newArrayList(); - List<BackendLoadStatistic> midBe = Lists.newArrayList(); - List<BackendLoadStatistic> highBe = Lists.newArrayList(); - clusterStat.getBackendStatisticByClass(lowBe, midBe, highBe, tabletCtx.getStorageMedium()); + List<BackendLoadStatistic> lowBEs = Lists.newArrayList(); + List<BackendLoadStatistic> highBEs = Lists.newArrayList(); + boolean isUrgent = clusterStat.getLowHighBEsWithIsUrgent(lowBEs, highBEs, tabletCtx.getStorageMedium()); + String isUrgentInfo = isUrgent ? " for urgent" : " for non-urgent"; - if (lowBe.isEmpty() && highBe.isEmpty()) { + if (lowBEs.isEmpty() && highBEs.isEmpty()) { throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "cluster is balance"); } // if all low backends is not available, return - if (lowBe.stream().noneMatch(BackendLoadStatistic::isAvailable)) { + if (lowBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) { throw new SchedException(Status.UNRECOVERABLE, "all low load backends is unavailable"); } @@ -258,21 +320,21 @@ public class BeLoadRebalancer extends Rebalancer { // Check if this tablet has replica on high load backend. // Also create a set to save hosts of this tablet. Set<String> hosts = Sets.newHashSet(); - boolean hasHighReplica = false; - for (Replica replica : replicas) { - if (highBe.stream().anyMatch(b -> b.getBeId() == replica.getBackendId())) { - hasHighReplica = true; + List<BackendLoadStatistic> replicaHighBEs = Lists.newArrayList(); + for (BackendLoadStatistic beStat : highBEs) { + if (replicas.stream().anyMatch(replica -> beStat.getBeId() == replica.getBackendId())) { + replicaHighBEs.add(beStat); } - Backend be = infoService.getBackend(replica.getBackendId()); + Backend be = infoService.getBackend(beStat.getBeId()); if (be == null) { throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, - "backend is dropped: " + replica.getBackendId()); + "backend is dropped: " + beStat.getBeId()); } hosts.add(be.getHost()); } - if (!hasHighReplica) { + if (replicaHighBEs.isEmpty()) { throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, - "no replica on high load backend"); + "no replica on high load backend" + isUrgentInfo); } // select a replica as source @@ -290,12 +352,12 @@ public class BeLoadRebalancer extends Rebalancer { } } if (!setSource) { - throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "unable to take src backend slot"); + throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot" + isUrgentInfo); } // Select a low load backend as destination. List<BackendLoadStatistic> candidates = Lists.newArrayList(); - for (BackendLoadStatistic beStat : lowBe) { + for (BackendLoadStatistic beStat : lowBEs) { if (beStat.isAvailable() && replicas.stream().noneMatch(r -> r.getBackendId() == beStat.getBeId())) { // check if on same host. Backend lowBackend = infoService.getBackend(beStat.getBeId()); @@ -308,18 +370,22 @@ public class BeLoadRebalancer extends Rebalancer { // no replica on this low load backend // 1. check if this clone task can make the cluster more balance. - List<RootPathLoadStatistic> availPaths = Lists.newArrayList(); - BalanceStatus bs; - if ((bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), availPaths, - false /* not supplement */)) != BalanceStatus.OK) { - LOG.debug("tablet not fit in BE {}, reason: {}", beStat.getBeId(), bs.getErrMsgs()); + BalanceStatus bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), null, + false /* not supplement */); + if (bs != BalanceStatus.OK) { + LOG.debug("tablet not fit in BE {}, reason: {}, {}", + beStat.getBeId(), bs.getErrMsgs(), isUrgentInfo); continue; } - if (!Config.be_rebalancer_fuzzy_test && !clusterStat.isMoreBalanced( - tabletCtx.getSrcBackendId(), beStat.getBeId(), tabletCtx.getTabletId(), - tabletCtx.getTabletSize(), tabletCtx.getStorageMedium())) { - continue; + if (!Config.be_rebalancer_fuzzy_test && !isUrgent) { + boolean moreBalanced = replicaHighBEs.stream().anyMatch(highBeStat -> + clusterStat.isMoreBalanced(highBeStat.getBeId(), beStat.getBeId(), + tabletCtx.getTabletId(), tabletCtx.getTabletSize(), + tabletCtx.getStorageMedium())); + if (!moreBalanced) { + continue; + } } PathSlot slot = backendsWorkingSlots.get(beStat.getBeId()); @@ -333,7 +399,8 @@ public class BeLoadRebalancer extends Rebalancer { } if (candidates.isEmpty()) { - throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "unable to find low dest backend"); + throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, + "unable to find low dest backend" + isUrgentInfo); } List<BePathLoadStatPair> candFitPaths = Lists.newArrayList(); @@ -343,15 +410,27 @@ public class BeLoadRebalancer extends Rebalancer { continue; } - // classify the paths. - // And we only select path from 'low' and 'mid' paths - List<RootPathLoadStatistic> pathLow = Lists.newArrayList(); - List<RootPathLoadStatistic> pathMid = Lists.newArrayList(); - List<RootPathLoadStatistic> pathHigh = Lists.newArrayList(); - beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); + List<RootPathLoadStatistic> pathLow = null; + if (isUrgent) { + pathLow = beStat.getAvailPaths(tabletCtx.getStorageMedium()).stream() + .filter(RootPathLoadStatistic::isGlobalLowUsage) + .collect(Collectors.toList()); + } else { + // classify the paths. + // And we only select path from 'low' and 'mid' paths + pathLow = Lists.newArrayList(); + List<RootPathLoadStatistic> pathMid = Lists.newArrayList(); + List<RootPathLoadStatistic> pathHigh = Lists.newArrayList(); + beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); + + pathLow.addAll(pathMid); + } + pathLow.forEach(path -> candFitPaths.add(new BePathLoadStatPair(beStat, path))); + } - pathLow.addAll(pathMid); - pathLow.stream().forEach(path -> candFitPaths.add(new BePathLoadStatPair(beStat, path))); + if (candFitPaths.isEmpty()) { + throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, + "unable to find low dest backend to fit in paths" + isUrgentInfo); } BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(candFitPaths); @@ -359,6 +438,7 @@ public class BeLoadRebalancer extends Rebalancer { for (BePathLoadStatPair bePathLoadStat : candFitPaths) { BackendLoadStatistic beStat = bePathLoadStat.getBackendLoadStatistic(); RootPathLoadStatistic pathStat = bePathLoadStat.getPathLoadStatistic(); + PathSlot slot = backendsWorkingSlots.get(beStat.getBeId()); if (slot == null) { continue; @@ -370,7 +450,7 @@ public class BeLoadRebalancer extends Rebalancer { } throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, - "beload waiting for dest backend slot"); + "unable to take dest slot" + isUrgentInfo); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java index 158f2cde4af..faf9704a902 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.clone.BackendLoadStatistic.Classification; import org.apache.doris.clone.BackendLoadStatistic.LoadScore; import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.resource.Tag; @@ -38,6 +39,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.BiConsumer; import java.util.stream.Collectors; /* @@ -144,6 +146,7 @@ public class LoadStatisticForTag { // classify all backends for (TStorageMedium medium : TStorageMedium.values()) { classifyBackendByLoad(medium); + classifyBackendByMaxDiskUsage(medium); } // sort be stats by mix load score @@ -245,6 +248,84 @@ public class LoadStatisticForTag { medium, avgLoadScore, lowCounter, midCounter, highCounter); } + private void classifyBackendByMaxDiskUsage(TStorageMedium medium) { + calcDiskGlobalUsages(medium); + Classification[] clazzs = { Classification.HIGH, Classification.LOW, Classification.MID }; + for (BackendLoadStatistic beStat : beLoadStatistics) { + if (!beStat.hasMedium(medium)) { + continue; + } + for (Classification clazz : clazzs) { + if (beStat.hasAvailPathWithGlobalClazz(medium, clazz)) { + beStat.setMaxDiskClazz(medium, clazz); + break; + } + } + } + } + + private void calcDiskGlobalUsages(TStorageMedium medium) { + double urgentDiffUsageThreshold; + if (Config.be_rebalancer_fuzzy_test) { + urgentDiffUsageThreshold = 0; + } else { + urgentDiffUsageThreshold = Config.balance_load_score_threshold + + Config.urgent_balance_disk_usage_extra_threshold; + if (urgentDiffUsageThreshold <= 0) { + return; + } + } + + double totalDiskUsages = 0; + int totalDiskNum = 0; + for (BackendLoadStatistic beStat : getBackendLoadStatistics()) { + if (!beStat.isAvailable()) { + continue; + } + for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) { + if (pathStat.getCapacityB() > 1L) { + totalDiskUsages += pathStat.getUsedPercent(); + totalDiskNum++; + } + } + } + + if (totalDiskNum == 0) { + return; + } + + double avgDiskUsage = totalDiskUsages / totalDiskNum; + double urgentDiskUsage = avgDiskUsage + urgentDiffUsageThreshold; + + boolean hasHighDisk = false; + for (BackendLoadStatistic beStat : getBackendLoadStatistics()) { + if (!beStat.isAvailable()) { + continue; + } + for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) { + if (pathStat.getCapacityB() > 1L) { + double usage = pathStat.getUsedPercent(); + if (usage > urgentDiskUsage) { + pathStat.setGlobalClazz(Classification.HIGH); + hasHighDisk = true; + } else if (usage > avgDiskUsage) { + pathStat.setGlobalClazz(Classification.MID); + } else { + pathStat.setGlobalClazz(Classification.LOW); + } + } + } + } + + if (!hasHighDisk) { + for (BackendLoadStatistic beStat : getBackendLoadStatistics()) { + for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) { + pathStat.setGlobalClazz(Classification.MID); + } + } + } + } + private static void sortBeStats(List<BackendLoadStatistic> beStats, TStorageMedium medium) { if (medium == null) { Collections.sort(beStats, BackendLoadStatistic.MIX_COMPARATOR); @@ -353,7 +434,8 @@ public class LoadStatisticForTag { pathStat.add(String.valueOf(pathStatistic.getCapacityB())); pathStat.add(String.valueOf(DebugUtil.DECIMAL_FORMAT_SCALE_3.format( pathStatistic.getUsedCapacityB() * 100 / (double) pathStatistic.getCapacityB()))); - pathStat.add(pathStatistic.getClazz().name()); + pathStat.add(pathStatistic.getLocalClazz().name()); + pathStat.add(pathStatistic.getGlobalClazz().name()); pathStat.add(pathStatistic.getDiskState().name()); statistics.add(pathStat); } @@ -375,6 +457,88 @@ public class LoadStatisticForTag { return beLoadStatistics; } + public boolean getLowHighBEsWithIsUrgent(List<BackendLoadStatistic> lowBEs, List<BackendLoadStatistic> highBEs, + TStorageMedium medium) { + if (getUrgentLowHighBEs(lowBEs, highBEs, medium)) { + return true; + } else { + lowBEs.clear(); + highBEs.clear(); + List<BackendLoadStatistic> midBEs = Lists.newArrayList(); + getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium); + return false; + } + } + + private boolean getUrgentLowHighBEs(List<BackendLoadStatistic> lowBEs, List<BackendLoadStatistic> highBEs, + TStorageMedium medium) { + List<BackendLoadStatistic> midBEs = Lists.newArrayList(); + for (BackendLoadStatistic beStat : getBackendLoadStatistics()) { + if (!beStat.isAvailable()) { + continue; + } + switch (beStat.getMaxDiskClazz(medium)) { + case LOW: + lowBEs.add(beStat); + break; + case MID: + midBEs.add(beStat); + break; + case HIGH: + highBEs.add(beStat); + break; + default: + break; + } + } + + if (lowBEs.isEmpty()) { + lowBEs.addAll(midBEs); + } + + if (lowBEs.isEmpty() && highBEs.size() > 1 && Config.enable_urgent_balance_no_low_backend) { + // all backend will exchange tablets among themselves. + lowBEs.addAll(highBEs); + } + + if (lowBEs.isEmpty() || highBEs.isEmpty()) { + lowBEs.clear(); + highBEs.clear(); + return false; + } + + BiConsumer<List<BackendLoadStatistic>, Boolean> resortBeStats = (beStats, choseMinPathElseMaxPath) -> { + List<Pair<BackendLoadStatistic, Double>> bePairs = Lists.newArrayList(); + for (BackendLoadStatistic beStat : beStats) { + double score = -1.0; + for (RootPathLoadStatistic pathStat : beStat.getAvailPaths(medium)) { + if (pathStat.getCapacityB() > 1) { + double usage = pathStat.getUsedPercent(); + if (score < 0 || (choseMinPathElseMaxPath && usage < score) + || (!choseMinPathElseMaxPath && usage > score)) { + score = usage; + } + } + } + bePairs.add(Pair.of(beStat, score)); + } + Collections.sort(bePairs, new Pair.PairComparator<Pair<BackendLoadStatistic, Double>>()); + + beStats.clear(); + bePairs.forEach(pair -> beStats.add(pair.key())); + }; + + resortBeStats.accept(lowBEs, true); + resortBeStats.accept(highBEs, false); + + LOG.debug("urgent backends' classification lowBe {}, highBe {}, medium: {}", + lowBEs.stream().map(BackendLoadStatistic::getBeId).collect(Collectors.toList()), + highBEs.stream().map(BackendLoadStatistic::getBeId).collect(Collectors.toList()), + medium); + + return true; + } + /* * If cluster is balance, all Backends will be in 'mid', and 'high' and 'low' is empty * If both 'high' and 'low' has Backends, just return diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java index d2f1983a831..3e94bebcfef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java @@ -34,7 +34,11 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic> private long copingSizeB; private DiskState diskState; - private Classification clazz = Classification.INIT; + // localClazz is compare with other disks on the same backend + private Classification localClazz = Classification.INIT; + + // globalClazz is compare with other disks on all backends + private Classification globalClazz = Classification.INIT; public RootPathLoadStatistic(long beId, String path, Long pathHash, TStorageMedium storageMedium, long capacityB, long usedCapacityB, DiskState diskState) { @@ -80,12 +84,28 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic> copingSizeB += size; } - public void setClazz(Classification clazz) { - this.clazz = clazz; + public void setLocalClazz(Classification clazz) { + this.localClazz = clazz; + } + + public Classification getLocalClazz() { + return localClazz; + } + + public void setGlobalClazz(Classification clazz) { + this.globalClazz = clazz; + } + + public Classification getGlobalClazz() { + return globalClazz; + } + + public boolean isGlobalHighUsage() { + return globalClazz == Classification.HIGH; } - public Classification getClazz() { - return clazz; + public boolean isGlobalLowUsage() { + return globalClazz == Classification.LOW; } public DiskState getDiskState() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 3f09a65b45e..4cab2fd8bee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -842,6 +842,7 @@ public class TabletScheduler extends MasterDaemon { || deleteReplicaOnSameHost(tabletCtx, force) || deleteReplicaNotInValidTag(tabletCtx, force) || deleteReplicaChosenByRebalancer(tabletCtx, force) + || deleteReplicaOnUrgentHighDisk(tabletCtx, force) || deleteReplicaOnHighLoadBackend(tabletCtx, force)) { // if we delete at least one redundant replica, we still throw a SchedException with status FINISHED // to remove this tablet from the pendingTablets(consider it as finished) @@ -990,6 +991,34 @@ public class TabletScheduler extends MasterDaemon { return true; } + private boolean deleteReplicaOnUrgentHighDisk(TabletSchedCtx tabletCtx, boolean force) throws SchedException { + Tag tag = chooseProperTag(tabletCtx, false); + LoadStatisticForTag statistic = statisticMap.get(tag); + if (statistic == null) { + return false; + } + + Replica chosenReplica = null; + double maxUsages = -1; + for (Replica replica : tabletCtx.getReplicas()) { + BackendLoadStatistic beStatistic = statistic.getBackendLoadStatistic(replica.getBackendId()); + if (beStatistic == null) { + continue; + } + RootPathLoadStatistic path = beStatistic.getPathStatisticByPathHash(replica.getPathHash()); + if (path != null && path.isGlobalHighUsage() && path.getUsedPercent() > maxUsages) { + maxUsages = path.getUsedPercent(); + chosenReplica = replica; + } + } + + if (chosenReplica != null) { + deleteReplicaInternal(tabletCtx, chosenReplica, "high usage disk", force); + return true; + } + return false; + } + private boolean deleteReplicaOnHighLoadBackend(TabletSchedCtx tabletCtx, boolean force) throws SchedException { Tag tag = chooseProperTag(tabletCtx, false); LoadStatisticForTag statistic = statisticMap.get(tag); @@ -1037,7 +1066,7 @@ public class TabletScheduler extends MasterDaemon { } if (chosenReplica != null) { - deleteReplicaInternal(tabletCtx, chosenReplica, "high load", force); + deleteReplicaInternal(tabletCtx, chosenReplica, "high load backend", force); return true; } return false; @@ -1251,48 +1280,71 @@ public class TabletScheduler extends MasterDaemon { return; } - // No need to prefetch too many balance task to pending queue. - // Because for every sched, it will re select the balance task. - int needAddBalanceNum = Math.min(Config.schedule_batch_size - getPendingNum(), - Config.max_balancing_tablets - getBalanceTabletsNumber()); - if (needAddBalanceNum <= 0) { - return; + // TODO: too ugly, remove balance_be_then_disk later. + if (Config.balance_be_then_disk) { + boolean hasBeBalance = selectTabletsForBeBalance(); + selectTabletsForDiskBalance(hasBeBalance); + } else { + selectTabletsForDiskBalance(false); + selectTabletsForBeBalance(); + } + } + + private boolean selectTabletsForBeBalance() { + int limit = getBalanceSchedQuotoLeft(); + if (limit <= 0) { + return false; } + int addNum = 0; List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets(); Collections.shuffle(alternativeTablets); for (TabletSchedCtx tabletCtx : alternativeTablets) { - if (needAddBalanceNum > 0 && addTablet(tabletCtx, false) == AddResult.ADDED) { - needAddBalanceNum--; + if (addNum >= limit) { + break; + } + if (addTablet(tabletCtx, false) == AddResult.ADDED) { + addNum++; } else { rebalancer.onTabletFailed(tabletCtx); } } - if (needAddBalanceNum <= 0) { - return; - } + return addNum > 0; + } + + private void selectTabletsForDiskBalance(boolean hasBeBalance) { if (Config.disable_disk_balance) { LOG.info("disk balance is disabled. skip selecting tablets for disk balance"); return; } - List<TabletSchedCtx> diskBalanceTablets = Lists.newArrayList(); - // if default rebalancer can not get new task or user given prio BEs, then use disk rebalancer to get task - if (diskRebalancer.hasPrioBackends() || alternativeTablets.isEmpty()) { - diskBalanceTablets = diskRebalancer.selectAlternativeTablets(); + + int limit = getBalanceSchedQuotoLeft(); + if (limit <= 0) { + return; } - for (TabletSchedCtx tabletCtx : diskBalanceTablets) { + + int addNum = 0; + for (TabletSchedCtx tabletCtx : diskRebalancer.selectAlternativeTablets()) { + if (addNum >= limit) { + break; + } // add if task from prio backend or cluster is balanced - if (alternativeTablets.isEmpty() || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) { + if (!hasBeBalance || Config.be_rebalancer_idle_seconds <= 0 + || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) { if (addTablet(tabletCtx, false) == AddResult.ADDED) { - needAddBalanceNum--; - if (needAddBalanceNum <= 0) { - break; - } + addNum++; } } } } + private int getBalanceSchedQuotoLeft() { + // No need to prefetch too many balance task to pending queue. + // Because for every sched, it will re select the balance task. + return Math.min(Config.schedule_batch_size - getPendingNum(), + Config.max_balancing_tablets - getBalanceTabletsNumber()); + } + /** * Try to create a balance task for a tablet. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java index 12bf2aba169..1bec8a5d7a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendLoadStatisticProcNode.java @@ -27,7 +27,7 @@ public class BackendLoadStatisticProcNode implements ProcNodeInterface { public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() .add("RootPath").add("PathHash").add("StorageMedium") .add("DataUsedCapacity").add("TotalCapacity").add("TotalUsedPct") - .add("Class").add("State") + .add("ClassInOneBE").add("ClassInAllBE").add("State") .build(); private final LoadStatisticForTag statistic; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java index 29eafabf158..1c623effc10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatisticProcDir.java @@ -29,7 +29,7 @@ import com.google.common.collect.ImmutableList; // show proc "/cluster_balance/cluster_load_stat/location_default/HDD"; public class ClusterLoadStatisticProcDir implements ProcDirInterface { public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() - .add("BeId").add("Available").add("UsedCapacity").add("Capacity") + .add("BeId").add("Available").add("UsedCapacity").add("Capacity").add("MaxDisk") .add("UsedPercent").add("ReplicaNum").add("CapCoeff").add("ReplCoeff").add("Score") .add("Class") .build(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java index 81fdaafc969..05abfacdce0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.clone.BackendLoadStatistic.Classification; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -51,24 +52,28 @@ public class ClusterLoadStatisticsTest { @Before public void setUp() { // be1 + // 50%, 95%, 2% be1 = new Backend(10001, "192.168.0.1", 9051); Map<String, DiskInfo> disks = Maps.newHashMap(); DiskInfo diskInfo1 = new DiskInfo("/path1"); - diskInfo1.setTotalCapacityB(1000000); - diskInfo1.setAvailableCapacityB(500000); - diskInfo1.setDataUsedCapacityB(480000); + diskInfo1.setTotalCapacityB(1_000_000); + diskInfo1.setAvailableCapacityB(500_000); + diskInfo1.setDataUsedCapacityB(480_000); + diskInfo1.setPathHash(1001); disks.put(diskInfo1.getRootPath(), diskInfo1); DiskInfo diskInfo2 = new DiskInfo("/path2"); - diskInfo2.setTotalCapacityB(2000000); - diskInfo2.setAvailableCapacityB(100000); - diskInfo2.setDataUsedCapacityB(80000); + diskInfo2.setTotalCapacityB(2_000_000); + diskInfo2.setAvailableCapacityB(100_000); + diskInfo2.setDataUsedCapacityB(80_000); + diskInfo2.setPathHash(1002); disks.put(diskInfo2.getRootPath(), diskInfo2); DiskInfo diskInfo3 = new DiskInfo("/path3"); - diskInfo3.setTotalCapacityB(500000); - diskInfo3.setAvailableCapacityB(490000); - diskInfo3.setDataUsedCapacityB(10000); + diskInfo3.setTotalCapacityB(500_000); + diskInfo3.setAvailableCapacityB(490_000); + diskInfo3.setDataUsedCapacityB(10_000); + diskInfo3.setPathHash(1003); disks.put(diskInfo3.getRootPath(), diskInfo3); be1.setDisks(ImmutableMap.copyOf(disks)); @@ -78,15 +83,17 @@ public class ClusterLoadStatisticsTest { be2 = new Backend(10002, "192.168.0.2", 9052); disks = Maps.newHashMap(); diskInfo1 = new DiskInfo("/path1"); - diskInfo1.setTotalCapacityB(2000000); - diskInfo1.setAvailableCapacityB(1900000); - diskInfo1.setDataUsedCapacityB(480000); + diskInfo1.setTotalCapacityB(2_000_000); + diskInfo1.setAvailableCapacityB(1_900_000); + diskInfo1.setDataUsedCapacityB(480_000); + diskInfo1.setPathHash(2001); disks.put(diskInfo1.getRootPath(), diskInfo1); diskInfo2 = new DiskInfo("/path2"); - diskInfo2.setTotalCapacityB(20000000); - diskInfo2.setAvailableCapacityB(1000000); - diskInfo2.setDataUsedCapacityB(80000); + diskInfo2.setTotalCapacityB(20_000_000); + diskInfo2.setAvailableCapacityB(1_000_000); + diskInfo2.setDataUsedCapacityB(80_000); + diskInfo2.setPathHash(2002); disks.put(diskInfo2.getRootPath(), diskInfo2); be2.setDisks(ImmutableMap.copyOf(disks)); @@ -96,21 +103,24 @@ public class ClusterLoadStatisticsTest { be3 = new Backend(10003, "192.168.0.3", 9053); disks = Maps.newHashMap(); diskInfo1 = new DiskInfo("/path1"); - diskInfo1.setTotalCapacityB(4000000); - diskInfo1.setAvailableCapacityB(100000); - diskInfo1.setDataUsedCapacityB(80000); + diskInfo1.setTotalCapacityB(4_000_000); + diskInfo1.setAvailableCapacityB(100_000); + diskInfo1.setDataUsedCapacityB(80_000); + diskInfo1.setPathHash(3001); disks.put(diskInfo1.getRootPath(), diskInfo1); diskInfo2 = new DiskInfo("/path2"); - diskInfo2.setTotalCapacityB(2000000); - diskInfo2.setAvailableCapacityB(100000); - diskInfo2.setDataUsedCapacityB(80000); + diskInfo2.setTotalCapacityB(2_000_000); + diskInfo2.setAvailableCapacityB(100_000); + diskInfo2.setDataUsedCapacityB(80_000); + diskInfo2.setPathHash(3002); disks.put(diskInfo2.getRootPath(), diskInfo2); diskInfo3 = new DiskInfo("/path3"); - diskInfo3.setTotalCapacityB(500000); - diskInfo3.setAvailableCapacityB(490000); - diskInfo3.setDataUsedCapacityB(10000); + diskInfo3.setTotalCapacityB(500_000); + diskInfo3.setAvailableCapacityB(490_000); + diskInfo3.setDataUsedCapacityB(10_000); + diskInfo3.setPathHash(3003); disks.put(diskInfo3.getRootPath(), diskInfo3); be3.setDisks(ImmutableMap.copyOf(disks)); @@ -120,9 +130,9 @@ public class ClusterLoadStatisticsTest { be4 = new Backend(10004, "192.168.0.4", 9053); disks = Maps.newHashMap(); diskInfo1 = new DiskInfo("/path1"); - diskInfo1.setTotalCapacityB(4000000); - diskInfo1.setAvailableCapacityB(100000); - diskInfo1.setDataUsedCapacityB(80000); + diskInfo1.setTotalCapacityB(4_000_000); + diskInfo1.setAvailableCapacityB(100_000); + diskInfo1.setDataUsedCapacityB(80_000); disks.put(diskInfo1.getRootPath(), diskInfo1); be4.setDisks(ImmutableMap.copyOf(disks)); @@ -161,6 +171,14 @@ public class ClusterLoadStatisticsTest { loadStatistic.init(); List<List<String>> infos = loadStatistic.getStatistic(TStorageMedium.HDD); Assert.assertEquals(3, infos.size()); + BackendLoadStatistic beStat1 = loadStatistic.getBackendLoadStatistic(be1.getId()); + Assert.assertNotNull(beStat1); + RootPathLoadStatistic path2 = beStat1.getPathStatisticByPathHash(1002); + RootPathLoadStatistic path3 = beStat1.getPathStatisticByPathHash(1003); + Assert.assertEquals(Classification.HIGH, path2.getLocalClazz()); + Assert.assertEquals(Classification.HIGH, path2.getGlobalClazz()); + Assert.assertEquals(Classification.LOW, path3.getLocalClazz()); + Assert.assertEquals(Classification.LOW, path3.getGlobalClazz()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org