This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d97be13ba7624d052073a0924c478f2dedcc353b Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Thu Aug 17 22:30:49 2023 +0800 [improvement](tablet clone) improve tablet balance, scaling speed etc (#22317) --- .../main/java/org/apache/doris/common/Config.java | 10 +- .../apache/doris/clone/BackendLoadStatistic.java | 112 +++++++++- .../org/apache/doris/clone/BeLoadRebalancer.java | 56 +++-- .../org/apache/doris/clone/DiskRebalancer.java | 20 +- .../apache/doris/clone/LoadStatisticForTag.java | 4 + .../apache/doris/clone/PartitionRebalancer.java | 12 +- .../java/org/apache/doris/clone/Rebalancer.java | 11 +- .../apache/doris/clone/RootPathLoadStatistic.java | 8 +- .../org/apache/doris/clone/TabletSchedCtx.java | 70 +++++-- .../org/apache/doris/clone/TabletScheduler.java | 228 ++++++++++++++------- .../java/org/apache/doris/common/FeConstants.java | 2 + .../common/proc/TabletSchedulerDetailProcDir.java | 2 +- .../main/java/org/apache/doris/task/CloneTask.java | 19 +- .../org/apache/doris/clone/DecommissionTest.java | 174 ++++++++++++++++ .../org/apache/doris/clone/DiskRebalanceTest.java | 36 ++-- .../java/org/apache/doris/clone/RebalanceTest.java | 6 +- .../org/apache/doris/clone/RebalancerTestUtil.java | 23 +++ .../doris/clone/RootPathLoadStatisticTest.java | 14 +- .../doris/clone/TabletRepairAndBalanceTest.java | 27 +-- .../java/org/apache/doris/task/AgentTaskTest.java | 3 +- 20 files changed, 638 insertions(+), 199 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 3578c38726..7fd3695398 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -934,17 +934,19 @@ public class Config extends ConfigBase { public static long tablet_repair_delay_factor_second = 60; /** - * the default slot number per path in tablet scheduler + * the default slot number per path for hdd in tablet scheduler * TODO(cmy): remove this config and dynamically adjust it by clone task statistic */ @ConfField(mutable = true, masterOnly = true) - public static int schedule_slot_num_per_path = 4; + public static int schedule_slot_num_per_hdd_path = 4; + /** - * the default slot number per path in tablet scheduler for decommission backend + * the default slot number per path for ssd in tablet scheduler + * TODO(cmy): remove this config and dynamically adjust it by clone task statistic */ @ConfField(mutable = true, masterOnly = true) - public static int schedule_decommission_slot_num_per_path = 8; + public static int schedule_slot_num_per_ssd_path = 8; /** * the default batch size in tablet scheduler for a single schedule. diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java index fdf45afe81..6263f53ebe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java @@ -68,6 +68,70 @@ public class BackendLoadStatistic { } } + public static class BePathLoadStatPair { + private BackendLoadStatistic beLoadStatistic; + private RootPathLoadStatistic pathLoadStatistic; + + BePathLoadStatPair(BackendLoadStatistic beLoadStatistic, RootPathLoadStatistic pathLoadStatistic) { + this.beLoadStatistic = beLoadStatistic; + this.pathLoadStatistic = pathLoadStatistic; + } + + BackendLoadStatistic getBackendLoadStatistic() { + return beLoadStatistic; + } + + RootPathLoadStatistic getPathLoadStatistic() { + return pathLoadStatistic; + } + + @Override + public String toString() { + return "{ beId: " + beLoadStatistic.getBeId() + ", be score: " + + beLoadStatistic.getLoadScore(pathLoadStatistic.getStorageMedium()) + + ", path: " + pathLoadStatistic.getPath() + + ", path used percent: " + pathLoadStatistic.getUsedPercent() + + " }"; + } + } + + public static class BePathLoadStatPairComparator implements Comparator<BePathLoadStatPair> { + private double avgBackendLoadScore; + private double avgPathUsedPercent; + + BePathLoadStatPairComparator(List<BePathLoadStatPair> loadStats) { + avgBackendLoadScore = 0.0; + avgPathUsedPercent = 0.0; + for (BePathLoadStatPair loadStat : loadStats) { + RootPathLoadStatistic pathStat = loadStat.getPathLoadStatistic(); + avgBackendLoadScore += loadStat.getBackendLoadStatistic().getLoadScore(pathStat.getStorageMedium()); + avgPathUsedPercent += pathStat.getUsedPercent(); + } + if (!loadStats.isEmpty()) { + avgPathUsedPercent /= loadStats.size(); + avgBackendLoadScore /= loadStats.size(); + } + if (avgBackendLoadScore == 0.0) { + avgBackendLoadScore = 1.0; + } + if (avgPathUsedPercent == 0.0) { + avgPathUsedPercent = 1.0; + } + } + + @Override + public int compare(BePathLoadStatPair o1, BePathLoadStatPair o2) { + return Double.compare(getCompareValue(o1), getCompareValue(o2)); + } + + private double getCompareValue(BePathLoadStatPair loadStat) { + BackendLoadStatistic beStat = loadStat.getBackendLoadStatistic(); + RootPathLoadStatistic pathStat = loadStat.getPathLoadStatistic(); + return 0.5 * beStat.getLoadScore(pathStat.getStorageMedium()) / avgBackendLoadScore + + 0.5 * pathStat.getUsedPercent() / avgPathUsedPercent; + } + } + public static final BeStatComparator HDD_COMPARATOR = new BeStatComparator(TStorageMedium.HDD); public static final BeStatComparator SSD_COMPARATOR = new BeStatComparator(TStorageMedium.SSD); public static final BeStatMixComparator MIX_COMPARATOR = new BeStatMixComparator(); @@ -362,9 +426,9 @@ public class BackendLoadStatistic { } result.add(pathStatistic); - return BalanceStatus.OK; } - return status; + + return result.isEmpty() ? status : BalanceStatus.OK; } /** @@ -456,6 +520,50 @@ public class BackendLoadStatistic { beId, low.size(), mid.size(), high.size()); } + public void getPathStatisticByClass(List<RootPathLoadStatistic> low, + List<RootPathLoadStatistic> mid, List<RootPathLoadStatistic> high, TStorageMedium storageMedium) { + for (RootPathLoadStatistic pathStat : pathStatistics) { + if (pathStat.getDiskState() == DiskState.OFFLINE + || (storageMedium != null && pathStat.getStorageMedium() != storageMedium)) { + continue; + } + + if (pathStat.getClazz() == Classification.LOW) { + low.add(pathStat); + } else if (pathStat.getClazz() == Classification.HIGH) { + high.add(pathStat); + } else { + mid.add(pathStat); + } + } + + LOG.debug("after adjust, backend {} path classification low/mid/high: {}/{}/{}", + beId, low.size(), mid.size(), high.size()); + } + + public void incrPathsCopingSize(Map<Long, Long> pathsCopingSize) { + boolean updated = false; + for (RootPathLoadStatistic pathStat : pathStatistics) { + Long copingSize = pathsCopingSize.get(pathStat.getPathHash()); + if (copingSize != null && copingSize > 0) { + pathStat.incrCopingSizeB(copingSize); + updated = true; + } + } + if (updated) { + Collections.sort(pathStatistics); + } + } + + public void incrPathCopingSize(long pathHash, long copingSize) { + RootPathLoadStatistic pathStat = pathStatistics.stream().filter( + p -> p.getPathHash() == pathHash).findFirst().orElse(null); + if (pathStat != null) { + pathStat.incrCopingSizeB(copingSize); + Collections.sort(pathStatistics); + } + } + public List<RootPathLoadStatistic> getPathStatistics() { return pathStatistics; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index 5317725881..d388e5fd79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -22,6 +22,8 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair; +import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator; import org.apache.doris.clone.SchedException.Status; import org.apache.doris.clone.SchedException.SubCode; import org.apache.doris.clone.TabletSchedCtx.Priority; @@ -51,8 +53,9 @@ import java.util.Set; public class BeLoadRebalancer extends Rebalancer { private static final Logger LOG = LogManager.getLogger(BeLoadRebalancer.class); - public BeLoadRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) { - super(infoService, invertedIndex); + public BeLoadRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex, + Map<Long, PathSlot> backendsWorkingSlots) { + super(infoService, invertedIndex, backendsWorkingSlots); } /* @@ -100,9 +103,16 @@ public class BeLoadRebalancer extends Rebalancer { return alternativeTablets; } - // get the number of low load paths. and we should at most select this number of tablets - long numOfLowPaths = lowBEs.stream().filter(b -> b.isAvailable() && b.hasAvailDisk()).mapToLong( - b -> b.getAvailPathNum(medium)).sum(); + long numOfLowPaths = 0; + for (BackendLoadStatistic backendLoadStatistic : lowBEs) { + if (!backendLoadStatistic.isAvailable()) { + continue; + } + PathSlot pathSlot = backendsWorkingSlots.get(backendLoadStatistic.getBeId()); + if (pathSlot != null) { + numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum(); + } + } LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium); int clusterAvailableBEnum = infoService.getAllBackendIds(true).size(); @@ -113,6 +123,10 @@ public class BeLoadRebalancer extends Rebalancer { OUTER: for (int i = highBEs.size() - 1; i >= 0; i--) { BackendLoadStatistic beStat = highBEs.get(i); + PathSlot pathSlot = backendsWorkingSlots.get(beStat.getBeId()); + if (pathSlot == null) { + continue; + } // classify the paths. Set<Long> pathLow = Sets.newHashSet(); @@ -129,7 +143,10 @@ public class BeLoadRebalancer extends Rebalancer { // for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets Map<Long, Integer> remainingPaths = Maps.newHashMap(); for (Long pathHash : pathHigh) { - remainingPaths.put(pathHash, Config.balance_slot_num_per_path); + int availBalanceNum = pathSlot.getAvailableBalanceNum(pathHash); + if (availBalanceNum > 0) { + remainingPaths.put(pathHash, availBalanceNum); + } } if (remainingPaths.isEmpty()) { @@ -201,8 +218,7 @@ public class BeLoadRebalancer extends Rebalancer { * 2. Select a low load backend as destination. And tablet should not has replica on this backend. */ @Override - public void completeSchedCtx(TabletSchedCtx tabletCtx, - Map<Long, PathSlot> backendsWorkingSlots) throws SchedException { + public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag()); if (clusterStat == null) { throw new SchedException(Status.UNRECOVERABLE, @@ -305,6 +321,7 @@ public class BeLoadRebalancer extends Rebalancer { throw new SchedException(Status.UNRECOVERABLE, "unable to find low backend"); } + List<BePathLoadStatPair> candFitPaths = Lists.newArrayList(); for (BackendLoadStatistic beStat : candidates) { PathSlot slot = backendsWorkingSlots.get(beStat.getBeId()); if (slot == null) { @@ -313,15 +330,26 @@ public class BeLoadRebalancer extends Rebalancer { // classify the paths. // And we only select path from 'low' and 'mid' paths - Set<Long> pathLow = Sets.newHashSet(); - Set<Long> pathMid = Sets.newHashSet(); - Set<Long> pathHigh = Sets.newHashSet(); + List<RootPathLoadStatistic> pathLow = Lists.newArrayList(); + List<RootPathLoadStatistic> pathMid = Lists.newArrayList(); + List<RootPathLoadStatistic> pathHigh = Lists.newArrayList(); beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium()); + pathLow.addAll(pathMid); + pathLow.stream().forEach(path -> candFitPaths.add(new BePathLoadStatPair(beStat, path))); + } - long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow); - if (pathHash != -1) { - tabletCtx.setDest(beStat.getBeId(), pathHash); + BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(candFitPaths); + Collections.sort(candFitPaths, comparator); + for (BePathLoadStatPair bePathLoadStat : candFitPaths) { + BackendLoadStatistic beStat = bePathLoadStat.getBackendLoadStatistic(); + RootPathLoadStatistic pathStat = bePathLoadStat.getPathLoadStatistic(); + PathSlot slot = backendsWorkingSlots.get(beStat.getBeId()); + if (slot == null) { + continue; + } + if (slot.takeBalanceSlot(pathStat.getPathHash()) != -1) { + tabletCtx.setDest(beStat.getBeId(), pathStat.getPathHash()); return; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java index abac0c2d1a..fe835b94af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -24,7 +24,6 @@ import org.apache.doris.clone.SchedException.Status; import org.apache.doris.clone.TabletSchedCtx.BalanceType; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; -import org.apache.doris.common.Config; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -52,8 +51,9 @@ import java.util.Set; public class DiskRebalancer extends Rebalancer { private static final Logger LOG = LogManager.getLogger(DiskRebalancer.class); - public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) { - super(infoService, invertedIndex); + public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex, + Map<Long, PathSlot> backendsWorkingSlots) { + super(infoService, invertedIndex, backendsWorkingSlots); } public List<BackendLoadStatistic> filterByPrioBackends(List<BackendLoadStatistic> bes) { @@ -152,6 +152,10 @@ public class DiskRebalancer extends Rebalancer { Collections.shuffle(midBEs); for (int i = midBEs.size() - 1; i >= 0; i--) { BackendLoadStatistic beStat = midBEs.get(i); + PathSlot pathSlot = backendsWorkingSlots.get(beStat.getBeId()); + if (pathSlot == null) { + continue; + } // classify the paths. Set<Long> pathLow = Sets.newHashSet(); @@ -171,7 +175,10 @@ public class DiskRebalancer extends Rebalancer { // for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets Map<Long, Integer> remainingPaths = Maps.newHashMap(); for (Long pathHash : pathHigh) { - remainingPaths.put(pathHash, Config.balance_slot_num_per_path); + int availBalanceNum = pathSlot.getAvailableBalanceNum(pathHash); + if (availBalanceNum > 0) { + remainingPaths.put(pathHash, availBalanceNum); + } } if (remainingPaths.isEmpty()) { @@ -246,8 +253,7 @@ public class DiskRebalancer extends Rebalancer { * 3. Select a low load path from this backend as destination. */ @Override - public void completeSchedCtx(TabletSchedCtx tabletCtx, - Map<Long, PathSlot> backendsWorkingSlots) throws SchedException { + public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag()); if (clusterStat == null) { throw new SchedException(Status.UNRECOVERABLE, @@ -323,7 +329,7 @@ public class DiskRebalancer extends Rebalancer { } long destPathHash = slot.takeBalanceSlot(stat.getPathHash()); if (destPathHash == -1) { - throw new SchedException(Status.UNRECOVERABLE, "unable to take dest slot"); + continue; } tabletCtx.setDest(beStat.getBeId(), destPathHash, stat.getPath()); setDest = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java index 224399f480..413a3b129f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java @@ -342,6 +342,10 @@ public class LoadStatisticForTag { return null; } + public List<BackendLoadStatistic> getBackendLoadStatistics() { + return beLoadStatistics; + } + /* * If cluster is balance, all Backends will be in 'mid', and 'high' and 'low' is empty * If both 'high' and 'low' has Backends, just return diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index d9d3f27cc7..6c83944462 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -20,6 +20,7 @@ package org.apache.doris.clone; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.resource.Tag; @@ -63,8 +64,9 @@ public class PartitionRebalancer extends Rebalancer { private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0); private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0); - public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) { - super(infoService, invertedIndex); + public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex, + Map<Long, PathSlot> backendsWorkingSlots) { + super(infoService, invertedIndex, backendsWorkingSlots); } @Override @@ -229,7 +231,7 @@ public class PartitionRebalancer extends Rebalancer { } @Override - protected void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, TabletScheduler.PathSlot> backendsWorkingSlots) + protected void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException { MovesCacheMap.MovesCache movesInProgress = movesCacheMap.getCache(tabletCtx.getTag(), tabletCtx.getStorageMedium()); @@ -271,10 +273,10 @@ public class PartitionRebalancer extends Rebalancer { if (pathHash == -1) { throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT, "paths has no available balance slot: " + availPath); - } else { - tabletCtx.setDest(beStat.getBeId(), pathHash); } + tabletCtx.setDest(beStat.getBeId(), pathHash); + // ToDeleteReplica is the source replica pair.second = srcReplica.getId(); } catch (IllegalStateException | NullPointerException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java index 4c3ef57546..bee23747bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java @@ -49,14 +49,17 @@ public abstract class Rebalancer { // When Rebalancer init, the statisticMap is usually empty. So it's no need to be an arg. // Only use updateLoadStatistic() to load stats. protected Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap(); + protected Map<Long, PathSlot> backendsWorkingSlots; protected TabletInvertedIndex invertedIndex; protected SystemInfoService infoService; // be id -> end time of prio protected Map<Long, Long> prioBackends = Maps.newConcurrentMap(); - public Rebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) { + public Rebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex, + Map<Long, PathSlot> backendsWorkingSlots) { this.infoService = infoService; this.invertedIndex = invertedIndex; + this.backendsWorkingSlots = backendsWorkingSlots; } public List<TabletSchedCtx> selectAlternativeTablets() { @@ -74,9 +77,9 @@ public abstract class Rebalancer { protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster( LoadStatisticForTag clusterStat, TStorageMedium medium); - public AgentTask createBalanceTask(TabletSchedCtx tabletCtx, Map<Long, PathSlot> backendsWorkingSlots) + public AgentTask createBalanceTask(TabletSchedCtx tabletCtx) throws SchedException { - completeSchedCtx(tabletCtx, backendsWorkingSlots); + completeSchedCtx(tabletCtx); if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) { return tabletCtx.createCloneReplicaAndTask(); } else { @@ -90,7 +93,7 @@ public abstract class Rebalancer { // You should check the moves' validation. // 2. If you want to generate {srcReplica, destBe} here, just do it. // 3. You should check the path slots of src & dest. - protected abstract void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, PathSlot> backendsWorkingSlots) + protected abstract void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException; public Long getToDeleteReplicaId(TabletSchedCtx tabletCtx) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java index 3aeb4069ab..1a51276f7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java @@ -31,6 +31,7 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic> private TStorageMedium storageMedium; private long capacityB; private long usedCapacityB; + private long copingSizeB; private DiskState diskState; private Classification clazz = Classification.INIT; @@ -43,6 +44,7 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic> this.storageMedium = storageMedium; this.capacityB = capacityB <= 0 ? 1 : capacityB; this.usedCapacityB = usedCapacityB; + this.copingSizeB = 0; this.diskState = diskState; } @@ -71,7 +73,11 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic> } public double getUsedPercent() { - return capacityB <= 0 ? 0.0 : usedCapacityB / (double) capacityB; + return capacityB <= 0 ? 0.0 : (usedCapacityB + copingSizeB) / (double) capacityB; + } + + public void incrCopingSizeB(long size) { + copingSizeB += size; } public void setClazz(Classification clazz) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 3c2d2b148c..4a3a1a6eb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -34,6 +34,7 @@ import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.ReplicaPersistInfo; import org.apache.doris.resource.Tag; @@ -179,6 +180,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { private long visibleVersion = -1; private long committedVersion = -1; + private long tabletSize = 0; + private Replica srcReplica = null; private long srcPathHash = -1; // for disk balance to keep src path, and avoid take slot on selectAlternativeTablets @@ -281,6 +284,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { return failedSchedCounter; } + public void resetFailedSchedCounter() { + failedSchedCounter = 0; + } + public void increaseFailedRunningCounter() { ++failedRunningCounter; } @@ -301,7 +308,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { } else { decommissionTime = -1; if (code == SubCode.WAITING_SLOT && type != Type.BALANCE) { - return failedSchedCounter > 30 * 1000 / TabletScheduler.SCHEDULE_INTERVAL_MS; + return failedSchedCounter > 30 * 1000 / FeConstants.tablet_schedule_interval_ms; } else { return failedSchedCounter > 10; } @@ -477,13 +484,13 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { // database lock should be held. public long getTabletSize() { - long max = Long.MIN_VALUE; - for (Replica replica : tablet.getReplicas()) { - if (replica.getDataSize() > max) { - max = replica.getDataSize(); - } - } - return max; + return tabletSize; + } + + public void updateTabletSize() { + tabletSize = 0; + tablet.getReplicas().stream().forEach( + replica -> tabletSize = Math.max(tabletSize, replica.getDataSize())); } /* @@ -905,6 +912,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { // if this is a balance task, or this is a repair task with // REPLICA_MISSING/REPLICA_RELOCATING, // we create a new replica with state CLONE + long replicaId = 0; if (tabletStatus == TabletStatus.REPLICA_MISSING || tabletStatus == TabletStatus.REPLICA_RELOCATING || type == Type.BALANCE || tabletStatus == TabletStatus.COLOCATE_MISMATCH @@ -917,14 +925,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { committedVersion, /* use committed version as last failed version */ -1 /* last success version */); - TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort()); - cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId, - tabletId, cloneReplica.getId(), schemaHash, Lists.newArrayList(tSrcBe), storageMedium, - visibleVersion, (int) (taskTimeoutMs / 1000)); - cloneTask.setPathHash(srcPathHash, destPathHash); - // addReplica() method will add this replica to tablet inverted index too. tablet.addReplica(cloneReplica); + replicaId = cloneReplica.getId(); } else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) { Preconditions.checkState(type == Type.REPAIR, type); // double check @@ -937,18 +940,31 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { throw new SchedException(Status.SCHEDULE_FAILED, "dest replica's path hash is changed. " + "current: " + replica.getPathHash() + ", scheduled: " + destPathHash); } + replicaId = replica.getId(); + } + + TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort()); + TBackend tDestBe = new TBackend(destBe.getHost(), destBe.getBePort(), destBe.getHttpPort()); - TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort()); - cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId, - tabletId, replica.getId(), schemaHash, Lists.newArrayList(tSrcBe), storageMedium, + cloneTask = new CloneTask(tDestBe, destBackendId, dbId, tblId, partitionId, indexId, tabletId, + replicaId, schemaHash, Lists.newArrayList(tSrcBe), storageMedium, visibleVersion, (int) (taskTimeoutMs / 1000)); - cloneTask.setPathHash(srcPathHash, destPathHash); - } + cloneTask.setPathHash(srcPathHash, destPathHash); this.state = State.RUNNING; return cloneTask; } + // for storage migration or cloning a new replica + public long getDestEstimatedCopingSize() { + if ((cloneTask != null && tabletStatus != TabletStatus.VERSION_INCOMPLETE) + || storageMediaMigrationTask != null) { + return Math.max(getTabletSize(), 10L); + } else { + return 0; + } + } + // timeout is between MIN_CLONE_TASK_TIMEOUT_MS and MAX_CLONE_TASK_TIMEOUT_MS private long getApproximateTimeoutMs() { long tabletSize = getTabletSize(); @@ -1131,6 +1147,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { result.add(TimeUtils.longToTimeString(lastSchedTime)); result.add(TimeUtils.longToTimeString(lastVisitedTime)); result.add(TimeUtils.longToTimeString(finishedTime)); + Pair<Double, String> tabletSizeDesc = DebugUtil.getByteUint(tabletSize); + result.add(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(tabletSizeDesc.first) + " " + tabletSizeDesc.second); result.add(copyTimeMs > 0 ? String.valueOf(copySize / copyTimeMs / 1000.0) : FeConstants.null_string); result.add(String.valueOf(failedSchedCounter)); result.add(String.valueOf(failedRunningCounter)); @@ -1162,8 +1180,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { value += 5 * 1000L; } + // repair tasks always prior than balance if (type == Type.BALANCE) { - value += 30 * 60 * 1000L; + value += 10 * 24 * 3600L; } return value; @@ -1174,12 +1193,19 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { StringBuilder sb = new StringBuilder(); sb.append("tablet id: ").append(tabletId).append(", status: ").append(tabletStatus.name()); sb.append(", state: ").append(state.name()).append(", type: ").append(type.name()); + if (type == Type.BALANCE && balanceType != null) { + sb.append(", balance: ").append(balanceType.name()); + } + if (priority != null) { + sb.append(", priority: ").append(priority.name()); + } + sb.append(", tablet size: ").append(tabletSize); if (srcReplica != null) { - sb.append(". from backend: ").append(srcReplica.getBackendId()); + sb.append(", from backend: ").append(srcReplica.getBackendId()); sb.append(", src path hash: ").append(srcPathHash); } if (destPathHash != -1) { - sb.append(". to backend: ").append(destBackendId); + sb.append(", to backend: ").append(destBackendId); sb.append(", dest path hash: ").append(destPathHash); } sb.append(", visible version: ").append(visibleVersion); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 1bc2f54337..d791b53475 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -22,7 +22,6 @@ import org.apache.doris.analysis.AdminRebalanceDiskStmt; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.ColocateTableIndex.GroupId; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.DiskInfo.DiskState; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndex; @@ -36,6 +35,8 @@ import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.Tablet.TabletStatus; import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair; +import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator; import org.apache.doris.clone.SchedException.Status; import org.apache.doris.clone.SchedException.SubCode; import org.apache.doris.clone.TabletSchedCtx.Priority; @@ -58,6 +59,7 @@ import org.apache.doris.task.DropReplicaTask; import org.apache.doris.task.StorageMediaMigrationTask; import org.apache.doris.thrift.TFinishTaskRequest; import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionState; @@ -72,12 +74,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; /** * TabletScheduler saved the tablets produced by TabletChecker and try to schedule them. @@ -103,8 +105,6 @@ public class TabletScheduler extends MasterDaemon { // the minimum interval of updating cluster statistics and priority of tablet info private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s - public static final long SCHEDULE_INTERVAL_MS = 100; - /* * Tablet is added to pendingTablets as well it's id in allTabletTypes. * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletTypes when @@ -127,12 +127,11 @@ public class TabletScheduler extends MasterDaemon { private Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap(); // Tag -> load statistic private Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap(); + private long lastStatUpdateTime = 0; private long lastSlotAdjustTime = 0; - private long lastCheckTimeoutTime = 0; - private Env env; private SystemInfoService infoService; private TabletInvertedIndex invertedIndex; @@ -151,19 +150,19 @@ public class TabletScheduler extends MasterDaemon { public TabletScheduler(Env env, SystemInfoService infoService, TabletInvertedIndex invertedIndex, TabletSchedulerStat stat, String rebalancerType) { - super("tablet scheduler", SCHEDULE_INTERVAL_MS); + super("tablet scheduler", FeConstants.tablet_schedule_interval_ms); this.env = env; this.infoService = infoService; this.invertedIndex = invertedIndex; this.colocateTableIndex = env.getColocateTableIndex(); this.stat = stat; if (rebalancerType.equalsIgnoreCase("partition")) { - this.rebalancer = new PartitionRebalancer(infoService, invertedIndex); + this.rebalancer = new PartitionRebalancer(infoService, invertedIndex, backendsWorkingSlots); } else { - this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex); + this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex, backendsWorkingSlots); } // if rebalancer can not get new task, then use diskRebalancer to get task - this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex); + this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex, backendsWorkingSlots); } public TabletSchedulerStat getStat() { @@ -190,10 +189,11 @@ public class TabletScheduler extends MasterDaemon { Set<Long> deletedBeIds = Sets.newHashSet(); for (Long beId : backendsWorkingSlots.keySet()) { if (backends.containsKey(beId)) { - List<Long> pathHashes = backends.get(beId).getDisks().values().stream() + Map<Long, TStorageMedium> paths = Maps.newHashMap(); + backends.get(beId).getDisks().values().stream() .filter(v -> v.getState() == DiskState.ONLINE) - .map(DiskInfo::getPathHash).collect(Collectors.toList()); - backendsWorkingSlots.get(beId).updatePaths(pathHashes); + .forEach(v -> paths.put(v.getPathHash(), v.getStorageMedium())); + backendsWorkingSlots.get(beId).updatePaths(paths); } else { deletedBeIds.add(beId); } @@ -208,9 +208,11 @@ public class TabletScheduler extends MasterDaemon { // add new backends for (Backend be : backends.values()) { if (!backendsWorkingSlots.containsKey(be.getId())) { - List<Long> pathHashes = be.getDisks().values().stream() - .map(DiskInfo::getPathHash).collect(Collectors.toList()); - PathSlot slot = new PathSlot(pathHashes, be.getId()); + Map<Long, TStorageMedium> paths = Maps.newHashMap(); + be.getDisks().values().stream() + .filter(v -> v.getState() == DiskState.ONLINE) + .forEach(v -> paths.put(v.getPathHash(), v.getStorageMedium())); + PathSlot slot = new PathSlot(paths, be.getId()); backendsWorkingSlots.put(be.getId(), slot); LOG.info("add new backend {} with slots num: {}", be.getId(), be.getDisks().size()); } @@ -261,9 +263,7 @@ public class TabletScheduler extends MasterDaemon { pendingTablets.offer(tablet); if (!contains) { - LOG.info("Add tablet to pending queue, tablet id {}, type {}, status {}, priority {}", - tablet.getTabletId(), tablet.getType(), tablet.getTabletStatus(), - tablet.getPriority()); + LOG.info("Add tablet to pending queue, {}", tablet); } return AddResult.ADDED; @@ -319,24 +319,16 @@ public class TabletScheduler extends MasterDaemon { return; } - if (System.currentTimeMillis() - lastCheckTimeoutTime >= 1000L) { - updateLoadStatisticsAndPriorityIfNecessary(); - handleRunningTablets(); - selectTabletsForBalance(); - lastCheckTimeoutTime = System.currentTimeMillis(); - } - + updateLoadStatistics(); + handleRunningTablets(); + selectTabletsForBalance(); schedulePendingTablets(); stat.counterTabletScheduleRound.incrementAndGet(); } - private void updateLoadStatisticsAndPriorityIfNecessary() { - if (System.currentTimeMillis() - lastStatUpdateTime < STAT_UPDATE_INTERVAL_MS) { - return; - } - + private void updateLoadStatistics() { updateLoadStatistic(); rebalancer.updateLoadStatistic(statisticMap); diskRebalancer.updateLoadStatistic(statisticMap); @@ -359,6 +351,12 @@ public class TabletScheduler extends MasterDaemon { newStatisticMap.put(tag, loadStatistic); LOG.debug("update load statistic for tag {}:\n{}", tag, loadStatistic.getBrief()); } + Map<Long, Long> pathsCopingSize = getPathsCopingSize(); + for (LoadStatisticForTag loadStatistic : newStatisticMap.values()) { + for (BackendLoadStatistic beLoadStatistic : loadStatistic.getBackendLoadStatistics()) { + beLoadStatistic.incrPathsCopingSize(pathsCopingSize); + } + } this.statisticMap = newStatisticMap; } @@ -584,6 +582,7 @@ public class TabletScheduler extends MasterDaemon { // we do not concern priority here. // once we take the tablet out of priority queue, priority is meaningless. tabletCtx.setTablet(tablet); + tabletCtx.updateTabletSize(); tabletCtx.setVersionInfo(partition.getVisibleVersion(), partition.getCommittedVersion()); tabletCtx.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId())); tabletCtx.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium()); @@ -691,6 +690,7 @@ public class TabletScheduler extends MasterDaemon { // create clone task batchTask.addTask(tabletCtx.createCloneReplicaAndTask()); + incrDestPathCopingSize(tabletCtx); } // In dealing with the case of missing replicas, we need to select a tag with missing replicas @@ -782,6 +782,7 @@ public class TabletScheduler extends MasterDaemon { private void handleReplicaRelocating(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException { stat.counterReplicaUnavailableErr.incrementAndGet(); + tabletCtx.setTabletStatus(TabletStatus.VERSION_INCOMPLETE); handleReplicaVersionIncomplete(tabletCtx, batchTask); } @@ -1202,6 +1203,7 @@ public class TabletScheduler extends MasterDaemon { // create clone task batchTask.addTask(tabletCtx.createCloneReplicaAndTask()); + incrDestPathCopingSize(tabletCtx); } /** @@ -1214,16 +1216,23 @@ public class TabletScheduler extends MasterDaemon { return; } - long numOfBalancingTablets = getBalanceTabletsNumber(); - if (numOfBalancingTablets > Config.max_balancing_tablets) { - LOG.info("number of balancing tablets {} exceed limit: {}, skip selecting tablets for balance", - numOfBalancingTablets, Config.max_balancing_tablets); + // No need to prefetch too many balance task to pending queue. + // Because for every sched, it will re select the balance task. + int needAddBalanceNum = Math.min(Config.schedule_batch_size - getPendingNum(), + Config.max_balancing_tablets - getBalanceTabletsNumber()); + if (needAddBalanceNum <= 0) { return; } List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets(); + Collections.shuffle(alternativeTablets); for (TabletSchedCtx tabletCtx : alternativeTablets) { - addTablet(tabletCtx, false); + if (addTablet(tabletCtx, false) == AddResult.ADDED) { + needAddBalanceNum--; + if (needAddBalanceNum <= 0) { + return; + } + } } if (Config.disable_disk_balance) { LOG.info("disk balance is disabled. skip selecting tablets for disk balance"); @@ -1237,7 +1246,12 @@ public class TabletScheduler extends MasterDaemon { for (TabletSchedCtx tabletCtx : diskBalanceTablets) { // add if task from prio backend or cluster is balanced if (alternativeTablets.isEmpty() || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) { - addTablet(tabletCtx, false); + if (addTablet(tabletCtx, false) == AddResult.ADDED) { + needAddBalanceNum--; + if (needAddBalanceNum <= 0) { + break; + } + } } } } @@ -1249,16 +1263,17 @@ public class TabletScheduler extends MasterDaemon { stat.counterBalanceSchedule.incrementAndGet(); AgentTask task = null; if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) { - task = diskRebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots); + task = diskRebalancer.createBalanceTask(tabletCtx); checkDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash()); checkDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash()); } else if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) { - task = rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots); + task = rebalancer.createBalanceTask(tabletCtx); } else { throw new SchedException(Status.UNRECOVERABLE, "unknown balance type: " + tabletCtx.getBalanceType().toString()); } batchTask.addTask(task); + incrDestPathCopingSize(tabletCtx); } // choose a path on a backend which is fit for the tablet @@ -1294,7 +1309,7 @@ public class TabletScheduler extends MasterDaemon { // get all available paths which this tablet can fit in. // beStatistics is sorted by mix load score in ascend order, so select from first to last. - List<RootPathLoadStatistic> allFitPaths = Lists.newArrayList(); + List<BePathLoadStatPair> allFitPaths = Lists.newArrayList(); for (BackendLoadStatistic bes : beStatistics) { if (!bes.isAvailable()) { LOG.debug("backend {} is not available, skip. tablet: {}", bes.getBeId(), tabletCtx.getTabletId()); @@ -1343,18 +1358,21 @@ public class TabletScheduler extends MasterDaemon { } } - Preconditions.checkState(resultPaths.size() == 1); - allFitPaths.add(resultPaths.get(0)); + resultPaths.stream().forEach(path -> allFitPaths.add(new BePathLoadStatPair(bes, path))); } if (allFitPaths.isEmpty()) { throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path for new replica"); } + BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(allFitPaths); + Collections.sort(allFitPaths, comparator); + // all fit paths has already been sorted by load score in 'allFitPaths' in ascend order. // just get first available path. // we try to find a path with specified media type, if not find, arbitrarily use one. - for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) { + for (BePathLoadStatPair bePathLoadStat : allFitPaths) { + RootPathLoadStatistic rootPathLoadStatistic = bePathLoadStat.getPathLoadStatistic(); if (rootPathLoadStatistic.getStorageMedium() != tabletCtx.getStorageMedium()) { LOG.debug("backend {}'s path {}'s storage medium {} " + "is not equal to tablet's storage medium {}, skip. tablet: {}", @@ -1385,7 +1403,8 @@ public class TabletScheduler extends MasterDaemon { boolean hasBePath = false; // no root path with specified media type is found, get arbitrary one. - for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) { + for (BePathLoadStatPair bePathLoadStat : allFitPaths) { + RootPathLoadStatistic rootPathLoadStatistic = bePathLoadStat.getPathLoadStatistic(); PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId()); if (slot == null) { LOG.debug("backend {}'s path {}'s slot is null, skip. tablet: {}", @@ -1622,7 +1641,10 @@ public class TabletScheduler extends MasterDaemon { tabletCtx.increaseFailedRunningCounter(); if (!tabletCtx.isExceedFailedRunningLimit()) { stat.counterCloneTaskFailed.incrementAndGet(); - addToRunningTablets(tabletCtx); + tabletCtx.releaseResource(this); + tabletCtx.resetFailedSchedCounter(); + tabletCtx.setState(TabletSchedCtx.State.PENDING); + addBackToPendingTablets(tabletCtx); return false; } else { // unrecoverable @@ -1767,9 +1789,42 @@ public class TabletScheduler extends MasterDaemon { return allTabletTypes.size(); } - public synchronized long getBalanceTabletsNumber() { - return pendingTablets.stream().filter(t -> t.getType() == Type.BALANCE).count() - + runningTablets.values().stream().filter(t -> t.getType() == Type.BALANCE).count(); + public synchronized int getBalanceTabletsNumber() { + return (int) (pendingTablets.stream().filter(t -> t.getType() == Type.BALANCE).count() + + runningTablets.values().stream().filter(t -> t.getType() == Type.BALANCE).count()); + } + + private synchronized Map<Long, Long> getPathsCopingSize() { + Map<Long, Long> pathsCopingSize = Maps.newHashMap(); + for (TabletSchedCtx tablet : runningTablets.values()) { + long pathHash = tablet.getDestPathHash(); + if (pathHash == 0 || pathHash == -1) { + continue; + } + + long copingSize = tablet.getDestEstimatedCopingSize(); + if (copingSize > 0) { + Long size = pathsCopingSize.getOrDefault(pathHash, 0L); + pathsCopingSize.put(pathHash, size + copingSize); + } + } + return pathsCopingSize; + } + + private void incrDestPathCopingSize(TabletSchedCtx tablet) { + long destPathHash = tablet.getDestPathHash(); + if (destPathHash == -1 || destPathHash == 0) { + return; + } + + for (LoadStatisticForTag loadStatistic : statisticMap.values()) { + BackendLoadStatistic beLoadStatistic = loadStatistic.getBackendLoadStatistics().stream() + .filter(v -> v.getBeId() == tablet.getDestBackendId()).findFirst().orElse(null); + if (beLoadStatistic != null) { + beLoadStatistic.incrPathCopingSize(destPathHash, tablet.getDestEstimatedCopingSize()); + break; + } + } } /** @@ -1782,22 +1837,22 @@ public class TabletScheduler extends MasterDaemon { private Map<Long, Slot> pathSlots = Maps.newConcurrentMap(); private long beId; - public PathSlot(List<Long> paths, long beId) { + public PathSlot(Map<Long, TStorageMedium> paths, long beId) { this.beId = beId; - for (Long pathHash : paths) { - pathSlots.put(pathHash, new Slot(beId)); + for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) { + pathSlots.put(entry.getKey(), new Slot(entry.getValue())); } } // update the path - public synchronized void updatePaths(List<Long> paths) { + public synchronized void updatePaths(Map<Long, TStorageMedium> paths) { // delete non exist path - pathSlots.entrySet().removeIf(entry -> !paths.contains(entry.getKey())); + pathSlots.entrySet().removeIf(entry -> !paths.containsKey(entry.getKey())); // add new path - for (Long pathHash : paths) { - if (!pathSlots.containsKey(pathHash)) { - pathSlots.put(pathHash, new Slot(beId)); + for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) { + if (!pathSlots.containsKey(entry.getKey())) { + pathSlots.put(entry.getKey(), new Slot(entry.getValue())); } } } @@ -1829,6 +1884,20 @@ public class TabletScheduler extends MasterDaemon { return true; } + public synchronized boolean hasAvailableBalanceSlot(long pathHash) { + if (pathHash == -1) { + return false; + } + Slot slot = pathSlots.get(pathHash); + if (slot == null) { + return false; + } + if (slot.getAvailableBalance() == 0) { + return false; + } + return true; + } + /** * If the specified 'pathHash' has available slot, decrease the slot number and return this path hash */ @@ -1872,27 +1941,27 @@ public class TabletScheduler extends MasterDaemon { return total; } + public synchronized int getTotalAvailBalanceSlotNum() { + int num = 0; + for (Slot slot : pathSlots.values()) { + num += slot.getAvailableBalance(); + } + return num; + } + /** * get path whose balance slot num is larger than 0 */ public synchronized Set<Long> getAvailPathsForBalance() { Set<Long> pathHashs = Sets.newHashSet(); for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) { - if (entry.getValue().getBalanceAvailable() > 0) { + if (entry.getValue().getAvailableBalance() > 0) { pathHashs.add(entry.getKey()); } } return pathHashs; } - public synchronized int getAvailBalanceSlotNum() { - int num = 0; - for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) { - num += entry.getValue().getBalanceAvailable(); - } - return num; - } - public synchronized List<List<String>> getSlotInfo(long beId) { List<List<String>> results = Lists.newArrayList(); pathSlots.forEach((key, value) -> { @@ -1901,13 +1970,18 @@ public class TabletScheduler extends MasterDaemon { result.add(String.valueOf(key)); result.add(String.valueOf(value.getAvailable())); result.add(String.valueOf(value.getTotal())); - result.add(String.valueOf(value.getBalanceAvailable())); + result.add(String.valueOf(value.getAvailableBalance())); result.add(String.valueOf(value.getAvgRate())); results.add(result); }); return results; } + public synchronized int getAvailableBalanceNum(long pathHash) { + Slot slot = pathSlots.get(pathHash); + return slot != null ? slot.getAvailableBalance() : 0; + } + public synchronized long takeBalanceSlot(long pathHash) { Slot slot = pathSlots.get(pathHash); if (slot == null) { @@ -1980,10 +2054,10 @@ public class TabletScheduler extends MasterDaemon { // for disk balance public long diskBalanceLastSuccTime = 0; - private long beId; + private TStorageMedium storageMedium; - public Slot(long beId) { - this.beId = beId; + public Slot(TStorageMedium storageMedium) { + this.storageMedium = storageMedium; this.used = 0; this.balanceUsed = 0; } @@ -1993,18 +2067,16 @@ public class TabletScheduler extends MasterDaemon { } public int getTotal() { - int total = Math.max(1, Config.schedule_slot_num_per_path); - - Backend be = Env.getCurrentSystemInfo().getBackend(beId); - if (be != null && be.isDecommissioned()) { - total = Math.max(1, Config.schedule_decommission_slot_num_per_path); + if (storageMedium == TStorageMedium.SSD) { + return Config.schedule_slot_num_per_ssd_path; + } else { + return Config.schedule_slot_num_per_hdd_path; } - - return total; } - public int getBalanceAvailable() { - return Math.max(0, getBalanceTotal() - balanceUsed); + public int getAvailableBalance() { + int leftBalance = Math.max(0, getBalanceTotal() - balanceUsed); + return Math.min(leftBalance, getAvailable()); } public int getBalanceTotal() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 59f3efa32f..789315c33f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -67,6 +67,8 @@ public class FeConstants { public static String null_string = "\\N"; public static long tablet_checker_interval_ms = 20 * 1000L; + public static long tablet_schedule_interval_ms = 100L; + public static String csv = "csv"; public static String csv_with_names = "csv_with_names"; public static String csv_with_names_and_types = "csv_with_names_and_types"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java index 4441a99431..67e24870f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java @@ -37,7 +37,7 @@ public class TabletSchedulerDetailProcDir implements ProcDirInterface { public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("TabletId") .add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe") .add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit") - .add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer") + .add("Finished").add("ReplicaSize").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer") .add("CmtVer").add("ErrMsg") .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java index 019fded640..60531ced30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java @@ -33,6 +33,7 @@ public class CloneTask extends AgentTask { private long replicaId; private List<TBackend> srcBackends; private TStorageMedium storageMedium; + private TBackend destBackend; private long visibleVersion; @@ -43,10 +44,11 @@ public class CloneTask extends AgentTask { private int taskVersion = VERSION_1; - public CloneTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, - long replicaId, int schemaHash, List<TBackend> srcBackends, TStorageMedium storageMedium, - long visibleVersion, int timeoutS) { + public CloneTask(TBackend destBackend, long backendId, long dbId, long tableId, long partitionId, + long indexId, long tabletId, long replicaId, int schemaHash, List<TBackend> srcBackends, + TStorageMedium storageMedium, long visibleVersion, int timeoutS) { super(null, backendId, TTaskType.CLONE, dbId, tableId, partitionId, indexId, tabletId); + this.destBackend = destBackend; this.replicaId = replicaId; this.schemaHash = schemaHash; this.srcBackends = srcBackends; @@ -95,15 +97,16 @@ public class CloneTask extends AgentTask { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("tablet id: ").append(tabletId).append(", replica id: ").append(replicaId).append(", schema hash: ") - .append(schemaHash); + sb.append("tablet id: ").append(tabletId) + .append(", replica id: ").append(replicaId) + .append(", schema hash: ").append(schemaHash); sb.append(", storageMedium: ").append(storageMedium.name()); sb.append(", visible version: ").append(visibleVersion); sb.append(", src backend: ").append(srcBackends.get(0).getHost()) .append(", src path hash: ").append(srcPathHash); - sb.append(", src backend: ").append(srcBackends.get(0).getHost()).append(", src path hash: ") - .append(srcPathHash); - sb.append(", dest backend: ").append(backendId).append(", dest path hash: ").append(destPathHash); + sb.append(", dest backend id: ").append(backendId) + .append(", dest backend: ").append(destBackend.getHost()) + .append(", dest path hash: ").append(destPathHash); return sb.toString(); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java new file mode 100644 index 0000000000..43ea5340bc --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.analysis.AlterSystemStmt; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.common.Config; +import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TDisk; +import org.apache.doris.thrift.TStorageMedium; +import org.apache.doris.utframe.UtFrameUtils; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +public class DecommissionTest { + private static final Logger LOG = LogManager.getLogger(TabletReplicaTooSlowTest.class); + // use a unique dir so that it won't be conflict with other unit test which + // may also start a Mocked Frontend + private static String runningDirBase = "fe"; + private static String runningDir = runningDirBase + "/mocked/DecommissionTest/" + UUID.randomUUID() + "/"; + private static ConnectContext connectContext; + + private static Random random = new Random(System.currentTimeMillis()); + + private long id = 10086; + + private final SystemInfoService systemInfoService = new SystemInfoService(); + private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex(); + + @BeforeClass + public static void beforeClass() throws Exception { + FeConstants.runningUnitTest = true; + System.out.println(runningDir); + FeConstants.runningUnitTest = true; + FeConstants.tablet_checker_interval_ms = 200; + FeConstants.tablet_schedule_interval_ms = 2000; + Config.tablet_repair_delay_factor_second = 1; + Config.enable_round_robin_create_tablet = true; + Config.schedule_slot_num_per_hdd_path = 10000; + Config.max_scheduling_tablets = 10000; + Config.schedule_batch_size = 10000; + Config.disable_balance = true; + // 4 backends: + // 127.0.0.1 + // 127.0.0.2 + // 127.0.0.3 + // 127.0.0.4 + UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 4); + List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends(); + for (Backend be : backends) { + Map<String, TDisk> backendDisks = Maps.newHashMap(); + TDisk tDisk1 = new TDisk(); + tDisk1.setRootPath("/home/doris1.HDD"); + tDisk1.setDiskTotalCapacity(20000000); + tDisk1.setDataUsedCapacity(1); + tDisk1.setUsed(true); + tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity - tDisk1.data_used_capacity); + tDisk1.setPathHash(random.nextLong()); + tDisk1.setStorageMedium(TStorageMedium.HDD); + backendDisks.put(tDisk1.getRootPath(), tDisk1); + + TDisk tDisk2 = new TDisk(); + tDisk2.setRootPath("/home/doris2.HHD"); + tDisk2.setDiskTotalCapacity(20000000); + tDisk2.setDataUsedCapacity(1); + tDisk2.setUsed(true); + tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity - tDisk2.data_used_capacity); + tDisk2.setPathHash(random.nextLong()); + tDisk2.setStorageMedium(TStorageMedium.HDD); + backendDisks.put(tDisk2.getRootPath(), tDisk2); + + be.updateDisks(backendDisks); + } + + connectContext = UtFrameUtils.createDefaultCtx(); + + // create database + String createDbStmtStr = "create database test;"; + CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext); + Env.getCurrentEnv().createDb(createDbStmt); + } + + @AfterClass + public static void tearDown() { + //UtFrameUtils.cleanDorisFeDir(runningDirBase); + } + + private static void createTable(String sql) throws Exception { + CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); + Env.getCurrentEnv().createTable(createTableStmt); + RebalancerTestUtil.updateReplicaPathHash(); + } + + @Test + public void testDecommissionBackend() throws Exception { + // test colocate tablet repair + String createStr = "create table test.tbl1\n" + + "(k1 date, k2 int)\n" + + "distributed by hash(k2) buckets 2400\n" + + "properties\n" + + "(\n" + + " \"replication_num\" = \"1\"\n" + + ")"; + ExceptionChecker.expectThrowsNoException(() -> createTable(createStr)); + int totalReplicaNum = 1 * 2400; + checkBalance(1, totalReplicaNum, 4); + + Backend backend = Env.getCurrentSystemInfo().getAllBackends().get(0); + String decommissionStmtStr = "alter system decommission backend \"" + backend.getHost() + + ":" + backend.getHeartbeatPort() + "\""; + AlterSystemStmt decommissionStmt = + (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(decommissionStmtStr, connectContext); + Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + + Assert.assertEquals(true, backend.isDecommissioned()); + + checkBalance(200, totalReplicaNum, 3); + } + + void checkBalance(int tryTimes, int totalReplicaNum, int backendNum) throws Exception { + int beReplicaNum = totalReplicaNum / backendNum; + for (int i = 0; i < tryTimes; i++) { + List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (backendNum != backendIds.size() && i != tryTimes - 1) { + Thread.sleep(1000); + continue; + } + + List<Integer> tabletNums = Lists.newArrayList(); + for (long beId : backendIds) { + tabletNums.add(Env.getCurrentInvertedIndex().getTabletNumByBackendId(beId)); + } + + Assert.assertEquals("tablet nums = " + tabletNums, backendNum, backendIds.size()); + for (int tabletNum : tabletNums) { + Assert.assertEquals("tablet nums = " + tabletNums, beReplicaNum, tabletNum); + } + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java index 9d5ffe1e6d..457466d72a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java @@ -20,7 +20,6 @@ package org.apache.doris.clone; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DataProperty; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.KeysType; @@ -36,7 +35,6 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.resource.Tag; -import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentTask; import org.apache.doris.task.StorageMediaMigrationTask; @@ -60,7 +58,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.LongStream; public class DiskRebalanceTest { @@ -79,10 +76,12 @@ public class DiskRebalanceTest { private final SystemInfoService systemInfoService = new SystemInfoService(); private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex(); private Map<Tag, LoadStatisticForTag> statisticMap; + private Map<Long, PathSlot> backendsWorkingSlots = Maps.newHashMap(); @Before public void setUp() throws Exception { Config.used_capacity_percent_max_diff = 1.0; + Config.balance_slot_num_per_path = 1; db = new Database(1, "test db"); db.setClusterName(SystemInfoService.DEFAULT_CLUSTER); new Expectations() { @@ -137,12 +136,19 @@ public class DiskRebalanceTest { Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1, 2, Lists.newArrayList(3L))); } - private void generateStatisticMap() { + private void generateStatisticsAndPathSlots() { LoadStatisticForTag loadStatistic = new LoadStatisticForTag(Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex); loadStatistic.init(); statisticMap = Maps.newHashMap(); statisticMap.put(Tag.DEFAULT_BACKEND_TAG, loadStatistic); + backendsWorkingSlots.clear(); + for (BackendLoadStatistic beStat : loadStatistic.getSortedBeLoadStats(null)) { + Map<Long, TStorageMedium> paths = Maps.newHashMap(); + beStat.getPathStatistics().stream().forEach( + path -> paths.put(path.getPathHash(), path.getStorageMedium())); + backendsWorkingSlots.put(beStat.getBeId(), new PathSlot(paths, beStat.getBeId())); + } } private void createPartitionsForTable(OlapTable olapTable, MaterializedIndex index, Long partitionCount) { @@ -187,8 +193,9 @@ public class DiskRebalanceTest { // case start Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", Level.DEBUG); - Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex()); - generateStatisticMap(); + generateStatisticsAndPathSlots(); + Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex(), + backendsWorkingSlots); rebalancer.updateLoadStatistic(statisticMap); List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets(); // check alternativeTablets; @@ -229,8 +236,9 @@ public class DiskRebalanceTest { // case start Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", Level.DEBUG); - Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex()); - generateStatisticMap(); + generateStatisticsAndPathSlots(); + Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex(), + backendsWorkingSlots); rebalancer.updateLoadStatistic(statisticMap); for (Map.Entry<Tag, LoadStatisticForTag> s : statisticMap.entrySet()) { if (s.getValue() != null) { @@ -240,16 +248,6 @@ public class DiskRebalanceTest { List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets(); // check alternativeTablets; Assert.assertEquals(2, alternativeTablets.size()); - Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap(); - for (Backend be : Env.getCurrentSystemInfo().getAllBackends()) { - if (!backendsWorkingSlots.containsKey(be.getId())) { - List<Long> pathHashes = be.getDisks().values().stream().map(DiskInfo::getPathHash) - .collect(Collectors.toList()); - PathSlot slot = new PathSlot(pathHashes, Config.schedule_slot_num_per_path); - backendsWorkingSlots.put(be.getId(), slot); - } - } - for (TabletSchedCtx tabletCtx : alternativeTablets) { LOG.info("try to schedule tablet {}", tabletCtx.getTabletId()); try { @@ -259,7 +257,7 @@ public class DiskRebalanceTest { tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId())); tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first - AgentTask task = rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots); + AgentTask task = rebalancer.createBalanceTask(tabletCtx); if (tabletCtx.getTabletSize() == 0) { Assert.fail("no exception"); } else { diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java index c36ef531c2..fe47338398 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java @@ -196,7 +196,7 @@ public class RebalanceTest { @Test public void testPrioBackends() { - Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex()); + Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(), Env.getCurrentInvertedIndex(), null); // add { // CHECKSTYLE IGNORE THIS LINE List<Backend> backends = Lists.newArrayList(); @@ -232,7 +232,7 @@ public class RebalanceTest { // Call runAfterCatalogReady manually instead of starting daemon thread TabletSchedulerStat stat = new TabletSchedulerStat(); PartitionRebalancer rebalancer = new PartitionRebalancer(Env.getCurrentSystemInfo(), - Env.getCurrentInvertedIndex()); + Env.getCurrentInvertedIndex(), null); TabletScheduler tabletScheduler = new TabletScheduler(env, systemInfoService, invertedIndex, stat, ""); // The rebalancer inside the scheduler will use this rebalancer, for getToDeleteReplicaId Deencapsulation.setField(tabletScheduler, "rebalancer", rebalancer); @@ -256,7 +256,7 @@ public class RebalanceTest { tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first // createCloneReplicaAndTask, create replica will change invertedIndex too. - AgentTask task = rebalancer.createBalanceTask(tabletCtx, tabletScheduler.getBackendsWorkingSlots()); + AgentTask task = rebalancer.createBalanceTask(tabletCtx); batchTask.addTask(task); } catch (SchedException e) { LOG.warn("schedule tablet {} failed: {}", tabletCtx.getTabletId(), e.getMessage()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java index 7d43a5fb77..95f71d0b51 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java @@ -19,6 +19,7 @@ package org.apache.doris.clone; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DiskInfo; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; @@ -32,6 +33,7 @@ import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Table; import java.util.List; import java.util.Map; @@ -106,4 +108,25 @@ public class RebalancerTestUtil { invertedIndex.addReplica(tablet.getId(), replica); }); } + + public static void updateReplicaPathHash() { + Table<Long, Long, Replica> replicaMetaTable = Env.getCurrentInvertedIndex().getReplicaMetaTable(); + for (Table.Cell<Long, Long, Replica> cell : replicaMetaTable.cellSet()) { + long beId = cell.getColumnKey(); + Backend be = Env.getCurrentSystemInfo().getBackend(beId); + if (be == null) { + continue; + } + Replica replica = cell.getValue(); + TabletMeta tabletMeta = Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey()); + ImmutableMap<String, DiskInfo> diskMap = be.getDisks(); + for (DiskInfo diskInfo : diskMap.values()) { + if (diskInfo.getStorageMedium() == tabletMeta.getStorageMedium()) { + replica.setPathHash(diskInfo.getPathHash()); + break; + } + } + } + } + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java index a0d8dd94c0..efb22d333a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java @@ -31,18 +31,22 @@ public class RootPathLoadStatisticTest { @Test public void test() { - RootPathLoadStatistic usageLow = new RootPathLoadStatistic(0L, "/home/disk1", 12345L, TStorageMedium.HDD, 4096L, + RootPathLoadStatistic usage1 = new RootPathLoadStatistic(0L, "/home/disk1", 12345L, TStorageMedium.HDD, 4096L, 1024L, DiskState.ONLINE); - RootPathLoadStatistic usageHigh = new RootPathLoadStatistic(0L, "/home/disk2", 67890L, TStorageMedium.HDD, + RootPathLoadStatistic usage2 = new RootPathLoadStatistic(0L, "/home/disk2", 67890L, TStorageMedium.HDD, 4096L, 2048L, DiskState.ONLINE); List<RootPathLoadStatistic> list = Lists.newArrayList(); - list.add(usageLow); - list.add(usageHigh); + list.add(usage1); + list.add(usage2); // low usage should be ahead Collections.sort(list); - Assert.assertTrue(list.get(0).getPathHash() == usageLow.getPathHash()); + Assert.assertTrue(list.get(0).getPathHash() == usage1.getPathHash()); + + usage1.incrCopingSizeB(2048L); + Collections.sort(list); + Assert.assertTrue(list.get(1).getPathHash() == usage1.getPathHash()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java index 5c58954952..0e9f1446c7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java @@ -26,7 +26,6 @@ import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.catalog.ColocateGroupSchema; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; @@ -36,7 +35,6 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; -import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -54,7 +52,6 @@ import org.apache.doris.thrift.TDisk; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.utframe.UtFrameUtils; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Table; @@ -162,7 +159,7 @@ public class TabletRepairAndBalanceTest { CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); Env.getCurrentEnv().createTable(createTableStmt); // must set replicas' path hash, or the tablet scheduler won't work - updateReplicaPathHash(); + RebalancerTestUtil.updateReplicaPathHash(); } private static void dropTable(String sql) throws Exception { @@ -170,26 +167,6 @@ public class TabletRepairAndBalanceTest { Env.getCurrentEnv().dropTable(dropTableStmt); } - private static void updateReplicaPathHash() { - Table<Long, Long, Replica> replicaMetaTable = Env.getCurrentInvertedIndex().getReplicaMetaTable(); - for (Table.Cell<Long, Long, Replica> cell : replicaMetaTable.cellSet()) { - long beId = cell.getColumnKey(); - Backend be = Env.getCurrentSystemInfo().getBackend(beId); - if (be == null) { - continue; - } - Replica replica = cell.getValue(); - TabletMeta tabletMeta = Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey()); - ImmutableMap<String, DiskInfo> diskMap = be.getDisks(); - for (DiskInfo diskInfo : diskMap.values()) { - if (diskInfo.getStorageMedium() == tabletMeta.getStorageMedium()) { - replica.setPathHash(diskInfo.getPathHash()); - break; - } - } - } - } - private static void alterTable(String sql) throws Exception { AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); Env.getCurrentEnv().getAlterInstance().processAlterTable(alterTableStmt); @@ -498,7 +475,7 @@ public class TabletRepairAndBalanceTest { ExceptionChecker.expectThrowsNoException(() -> createTable(createStr6)); OlapTable tbl3 = db.getOlapTableOrDdlException("col_tbl3"); - updateReplicaPathHash(); + RebalancerTestUtil.updateReplicaPathHash(); // Set one replica's state as DECOMMISSION, see if it can be changed to NORMAL Tablet oneTablet = null; Replica oneReplica = null; diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index 86d3fda79f..b0b862e3dd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -114,7 +114,8 @@ public class AgentTaskTest { // clone cloneTask = - new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, schemaHash1, + new CloneTask(new TBackend("host2", 8290, 8390), backendId1, dbId, tableId, partitionId, + indexId1, tabletId1, replicaId1, schemaHash1, Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, 3600); // storageMediaMigrationTask --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org