This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new f5e896f6ebf [improvement](balance) don't balance tablet which has unfinish alter job #39121 (#39202) f5e896f6ebf is described below commit f5e896f6ebf51171b2c338763a2d95b290dc47aa Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Tue Aug 13 09:33:26 2024 +0800 [improvement](balance) don't balance tablet which has unfinish alter job #39121 (#39202) cherry pick from #39121 --- .../main/java/org/apache/doris/alter/Alter.java | 23 +++++++++++++++++ .../apache/doris/alter/SchemaChangeHandler.java | 4 +++ .../org/apache/doris/clone/BeLoadRebalancer.java | 23 +---------------- .../org/apache/doris/clone/DiskRebalancer.java | 14 ++-------- .../apache/doris/clone/PartitionRebalancer.java | 2 +- .../java/org/apache/doris/clone/Rebalancer.java | 30 +++++++++++++++++++++- .../org/apache/doris/clone/TabletScheduler.java | 4 +++ 7 files changed, 64 insertions(+), 36 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index ebf99ce2e9e..12166bfe4b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -78,6 +78,7 @@ import org.apache.doris.thrift.TTabletType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -86,6 +87,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; public class Alter { private static final Logger LOG = LogManager.getLogger(Alter.class); @@ -892,6 +894,27 @@ public class Alter { } } + public Set<Long> getUnfinishedAlterTableIds() { + Set<Long> unfinishedTableIds = Sets.newHashSet(); + for (AlterJobV2 job : schemaChangeHandler.getAlterJobsV2().values()) { + if (!job.isDone()) { + unfinishedTableIds.add(job.getTableId()); + } + } + for (IndexChangeJob job : ((SchemaChangeHandler) schemaChangeHandler).getIndexChangeJobs().values()) { + if (!job.isDone()) { + unfinishedTableIds.add(job.getTableId()); + } + } + for (AlterJobV2 job : materializedViewHandler.getAlterJobsV2().values()) { + if (!job.isDone()) { + unfinishedTableIds.add(job.getTableId()); + } + } + + return unfinishedTableIds; + } + public AlterHandler getSchemaChangeHandler() { return schemaChangeHandler; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 86ffb55ea76..09e8c984c07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1734,6 +1734,10 @@ public class SchemaChangeHandler extends AlterHandler { } } + public Map<Long, IndexChangeJob> getIndexChangeJobs() { + return indexChangeJobs; + } + public List<List<Comparable>> getAllIndexChangeJobInfos() { List<List<Comparable>> indexChangeJobInfos = new LinkedList<>(); for (IndexChangeJob indexChangeJob : ImmutableList.copyOf(indexChangeJobs.values())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index 0da7428e422..78452000ca5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -17,9 +17,6 @@ package org.apache.doris.clone; -import org.apache.doris.catalog.CatalogRecycleBin; -import org.apache.doris.catalog.ColocateTableIndex; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; @@ -31,7 +28,6 @@ import org.apache.doris.clone.SchedException.SubCode; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -120,15 +116,7 @@ public class BeLoadRebalancer extends Rebalancer { LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium); List<String> alternativeTabletInfos = Lists.newArrayList(); - - // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) - // so in clone ut recycleBin need to set to null. - CatalogRecycleBin recycleBin = null; - if (!FeConstants.runningUnitTest) { - recycleBin = Env.getCurrentRecycleBin(); - } int clusterAvailableBEnum = infoService.getAllBackendIds(true).size(); - ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex(); List<Set<Long>> lowBETablets = lowBEs.stream() .map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId()))) .collect(Collectors.toList()); @@ -230,11 +218,7 @@ public class BeLoadRebalancer extends Rebalancer { long replicaDataSize = replica.getDataSize(); if (remainingPaths.containsKey(replicaPathHash)) { TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId); - if (tabletMeta == null) { - continue; - } - - if (colocateTableIndex.isColocateTable(tabletMeta.getTableId())) { + if (!canBalanceTablet(tabletMeta)) { continue; } @@ -245,11 +229,6 @@ public class BeLoadRebalancer extends Rebalancer { continue; } - if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(), - tabletMeta.getTableId(), tabletMeta.getPartitionId())) { - continue; - } - boolean isFit = lowBEs.stream().anyMatch(be -> be.isFit(replicaDataSize, medium, null, false) == BalanceStatus.OK); if (!isFit) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java index 96eef52d597..a8448b8ffd2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -17,7 +17,6 @@ package org.apache.doris.clone; -import org.apache.doris.catalog.CatalogRecycleBin; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; @@ -59,6 +58,7 @@ public class DiskRebalancer extends Rebalancer { public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex, Map<Long, PathSlot> backendsWorkingSlots) { super(infoService, invertedIndex, backendsWorkingSlots); + canBalanceColocateTable = true; } public List<BackendLoadStatistic> filterByPrioBackends(List<BackendLoadStatistic> bes) { @@ -163,12 +163,6 @@ public class DiskRebalancer extends Rebalancer { return alternativeTablets; } - // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) - // so in clone ut recycleBin need to set to null. - CatalogRecycleBin recycleBin = null; - if (!FeConstants.runningUnitTest) { - recycleBin = Env.getCurrentRecycleBin(); - } Set<Long> alternativeTabletIds = Sets.newHashSet(); Set<Long> unbalancedBEs = Sets.newHashSet(); // choose tablets from backends randomly. @@ -243,11 +237,7 @@ public class DiskRebalancer extends Rebalancer { long replicaPathHash = replica.getPathHash(); if (remainingPaths.containsKey(replicaPathHash)) { TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId); - if (tabletMeta == null) { - continue; - } - if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(), - tabletMeta.getTableId(), tabletMeta.getPartitionId())) { + if (!canBalanceTablet(tabletMeta)) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index 7095ad8dc54..5af920c74fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -138,7 +138,7 @@ public class PartitionRebalancer extends Rebalancer { invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium)); BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId, TabletMeta tabletMeta) -> { - return tabletMeta != null + return canBalanceTablet(tabletMeta) && tabletMeta.getPartitionId() == move.partitionId && tabletMeta.getIndexId() == move.indexId && !invalidIds.contains(tabletId) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java index 682c2915989..af8bc6d67fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java @@ -17,9 +17,14 @@ package org.apache.doris.clone; +import org.apache.doris.catalog.CatalogRecycleBin; +import org.apache.doris.catalog.ColocateTableIndex; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.catalog.TabletMeta; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -29,13 +34,14 @@ import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.collect.Table; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; - +import java.util.Set; /* * Rebalancer is responsible for @@ -61,6 +67,9 @@ public abstract class Rebalancer { // be id -> end time of prio protected Map<Long, Long> prioBackends = Maps.newConcurrentMap(); + protected boolean canBalanceColocateTable = false; + private Set<Long> alterTableIds = Sets.newHashSet(); + // tag -> (medium, timestamp) private Table<Tag, TStorageMedium, Long> lastPickTimeTable = HashBasedTable.create(); @@ -106,6 +115,21 @@ public abstract class Rebalancer { return lastPickTime == null || now - lastPickTime >= Config.be_rebalancer_idle_seconds * 1000L; } + protected boolean canBalanceTablet(TabletMeta tabletMeta) { + // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) + // so in clone ut recycleBin need to set to null. + ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex(); + CatalogRecycleBin recycleBin = null; + if (!FeConstants.runningUnitTest) { + recycleBin = Env.getCurrentRecycleBin(); + } + return tabletMeta != null + && !alterTableIds.contains(tabletMeta.getTableId()) + && (canBalanceColocateTable || !colocateTableIndex.isColocateTable(tabletMeta.getTableId())) + && (recycleBin == null || !recycleBin.isRecyclePartition(tabletMeta.getDbId(), + tabletMeta.getTableId(), tabletMeta.getPartitionId())); + } + public AgentTask createBalanceTask(TabletSchedCtx tabletCtx) throws SchedException { completeSchedCtx(tabletCtx); @@ -139,6 +163,10 @@ public abstract class Rebalancer { this.statisticMap = statisticMap; } + public void updateAlterTableIds(Set<Long> alterTableIds) { + this.alterTableIds = alterTableIds; + } + public void addPrioBackends(List<Backend> backends, long timeoutS) { long currentTimeMillis = System.currentTimeMillis(); for (Backend backend : backends) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index a3a3a93e0fa..10a21c2f305 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -352,6 +352,10 @@ public class TabletScheduler extends MasterDaemon { rebalancer.updateLoadStatistic(statisticMap); diskRebalancer.updateLoadStatistic(statisticMap); + Set<Long> alterTableIds = Env.getCurrentEnv().getAlterInstance().getUnfinishedAlterTableIds(); + rebalancer.updateAlterTableIds(alterTableIds); + diskRebalancer.updateAlterTableIds(alterTableIds); + lastStatUpdateTime = System.currentTimeMillis(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org