This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f5e896f6ebf [improvement](balance) don't balance tablet which has 
unfinish alter job #39121 (#39202)
f5e896f6ebf is described below

commit f5e896f6ebf51171b2c338763a2d95b290dc47aa
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Tue Aug 13 09:33:26 2024 +0800

    [improvement](balance) don't balance tablet which has unfinish alter job 
#39121 (#39202)
    
    cherry pick from #39121
---
 .../main/java/org/apache/doris/alter/Alter.java    | 23 +++++++++++++++++
 .../apache/doris/alter/SchemaChangeHandler.java    |  4 +++
 .../org/apache/doris/clone/BeLoadRebalancer.java   | 23 +----------------
 .../org/apache/doris/clone/DiskRebalancer.java     | 14 ++--------
 .../apache/doris/clone/PartitionRebalancer.java    |  2 +-
 .../java/org/apache/doris/clone/Rebalancer.java    | 30 +++++++++++++++++++++-
 .../org/apache/doris/clone/TabletScheduler.java    |  4 +++
 7 files changed, 64 insertions(+), 36 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index ebf99ce2e9e..12166bfe4b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -78,6 +78,7 @@ import org.apache.doris.thrift.TTabletType;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -86,6 +87,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 public class Alter {
     private static final Logger LOG = LogManager.getLogger(Alter.class);
@@ -892,6 +894,27 @@ public class Alter {
         }
     }
 
+    public Set<Long> getUnfinishedAlterTableIds() {
+        Set<Long> unfinishedTableIds = Sets.newHashSet();
+        for (AlterJobV2 job : schemaChangeHandler.getAlterJobsV2().values()) {
+            if (!job.isDone()) {
+                unfinishedTableIds.add(job.getTableId());
+            }
+        }
+        for (IndexChangeJob job : ((SchemaChangeHandler) 
schemaChangeHandler).getIndexChangeJobs().values()) {
+            if (!job.isDone()) {
+                unfinishedTableIds.add(job.getTableId());
+            }
+        }
+        for (AlterJobV2 job : 
materializedViewHandler.getAlterJobsV2().values()) {
+            if (!job.isDone()) {
+                unfinishedTableIds.add(job.getTableId());
+            }
+        }
+
+        return unfinishedTableIds;
+    }
+
     public AlterHandler getSchemaChangeHandler() {
         return schemaChangeHandler;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 86ffb55ea76..09e8c984c07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -1734,6 +1734,10 @@ public class SchemaChangeHandler extends AlterHandler {
         }
     }
 
+    public Map<Long, IndexChangeJob> getIndexChangeJobs() {
+        return indexChangeJobs;
+    }
+
     public List<List<Comparable>> getAllIndexChangeJobInfos() {
         List<List<Comparable>> indexChangeJobInfos = new LinkedList<>();
         for (IndexChangeJob indexChangeJob : 
ImmutableList.copyOf(indexChangeJobs.values())) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 0da7428e422..78452000ca5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -17,9 +17,6 @@
 
 package org.apache.doris.clone;
 
-import org.apache.doris.catalog.CatalogRecycleBin;
-import org.apache.doris.catalog.ColocateTableIndex;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
@@ -31,7 +28,6 @@ import org.apache.doris.clone.SchedException.SubCode;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.clone.TabletScheduler.PathSlot;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -120,15 +116,7 @@ public class BeLoadRebalancer extends Rebalancer {
         LOG.info("get number of low load paths: {}, with medium: {}", 
numOfLowPaths, medium);
 
         List<String> alternativeTabletInfos = Lists.newArrayList();
-
-        // Clone ut mocked env, but CatalogRecycleBin is not mockable (it 
extends from Thread)
-        // so in clone ut recycleBin need to set to null.
-        CatalogRecycleBin recycleBin = null;
-        if (!FeConstants.runningUnitTest) {
-            recycleBin = Env.getCurrentRecycleBin();
-        }
         int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
-        ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
         List<Set<Long>> lowBETablets = lowBEs.stream()
                 .map(beStat -> 
Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
                 .collect(Collectors.toList());
@@ -230,11 +218,7 @@ public class BeLoadRebalancer extends Rebalancer {
                 long replicaDataSize = replica.getDataSize();
                 if (remainingPaths.containsKey(replicaPathHash)) {
                     TabletMeta tabletMeta = 
invertedIndex.getTabletMeta(tabletId);
-                    if (tabletMeta == null) {
-                        continue;
-                    }
-
-                    if 
(colocateTableIndex.isColocateTable(tabletMeta.getTableId())) {
+                    if (!canBalanceTablet(tabletMeta)) {
                         continue;
                     }
 
@@ -245,11 +229,6 @@ public class BeLoadRebalancer extends Rebalancer {
                         continue;
                     }
 
-                    if (recycleBin != null && 
recycleBin.isRecyclePartition(tabletMeta.getDbId(),
-                            tabletMeta.getTableId(), 
tabletMeta.getPartitionId())) {
-                        continue;
-                    }
-
                     boolean isFit = lowBEs.stream().anyMatch(be -> 
be.isFit(replicaDataSize,
                             medium, null, false) == BalanceStatus.OK);
                     if (!isFit) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index 96eef52d597..a8448b8ffd2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.clone;
 
-import org.apache.doris.catalog.CatalogRecycleBin;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.TabletInvertedIndex;
@@ -59,6 +58,7 @@ public class DiskRebalancer extends Rebalancer {
     public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex 
invertedIndex,
             Map<Long, PathSlot> backendsWorkingSlots) {
         super(infoService, invertedIndex, backendsWorkingSlots);
+        canBalanceColocateTable = true;
     }
 
     public List<BackendLoadStatistic> 
filterByPrioBackends(List<BackendLoadStatistic> bes) {
@@ -163,12 +163,6 @@ public class DiskRebalancer extends Rebalancer {
             return alternativeTablets;
         }
 
-        // Clone ut mocked env, but CatalogRecycleBin is not mockable (it 
extends from Thread)
-        // so in clone ut recycleBin need to set to null.
-        CatalogRecycleBin recycleBin = null;
-        if (!FeConstants.runningUnitTest) {
-            recycleBin = Env.getCurrentRecycleBin();
-        }
         Set<Long> alternativeTabletIds = Sets.newHashSet();
         Set<Long> unbalancedBEs = Sets.newHashSet();
         // choose tablets from backends randomly.
@@ -243,11 +237,7 @@ public class DiskRebalancer extends Rebalancer {
                 long replicaPathHash = replica.getPathHash();
                 if (remainingPaths.containsKey(replicaPathHash)) {
                     TabletMeta tabletMeta = 
invertedIndex.getTabletMeta(tabletId);
-                    if (tabletMeta == null) {
-                        continue;
-                    }
-                    if (recycleBin != null && 
recycleBin.isRecyclePartition(tabletMeta.getDbId(),
-                            tabletMeta.getTableId(), 
tabletMeta.getPartitionId())) {
+                    if (!canBalanceTablet(tabletMeta)) {
                         continue;
                     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index 7095ad8dc54..5af920c74fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -138,7 +138,7 @@ public class PartitionRebalancer extends Rebalancer {
                     
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium));
 
             BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId, 
TabletMeta tabletMeta) -> {
-                return tabletMeta != null
+                return canBalanceTablet(tabletMeta)
                         && tabletMeta.getPartitionId() == move.partitionId
                         && tabletMeta.getIndexId() == move.indexId
                         && !invalidIds.contains(tabletId)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
index 682c2915989..af8bc6d67fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
@@ -17,9 +17,14 @@
 
 package org.apache.doris.clone;
 
+import org.apache.doris.catalog.CatalogRecycleBin;
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.clone.TabletScheduler.PathSlot;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -29,13 +34,14 @@ import org.apache.doris.thrift.TStorageMedium;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.collect.Table;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
-
+import java.util.Set;
 
 /*
  * Rebalancer is responsible for
@@ -61,6 +67,9 @@ public abstract class Rebalancer {
     // be id -> end time of prio
     protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
 
+    protected boolean canBalanceColocateTable = false;
+    private Set<Long> alterTableIds = Sets.newHashSet();
+
     // tag -> (medium, timestamp)
     private Table<Tag, TStorageMedium, Long> lastPickTimeTable = 
HashBasedTable.create();
 
@@ -106,6 +115,21 @@ public abstract class Rebalancer {
         return lastPickTime == null || now - lastPickTime >= 
Config.be_rebalancer_idle_seconds * 1000L;
     }
 
+    protected boolean canBalanceTablet(TabletMeta tabletMeta) {
+        // Clone ut mocked env, but CatalogRecycleBin is not mockable (it 
extends from Thread)
+        // so in clone ut recycleBin need to set to null.
+        ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
+        CatalogRecycleBin recycleBin = null;
+        if (!FeConstants.runningUnitTest) {
+            recycleBin = Env.getCurrentRecycleBin();
+        }
+        return tabletMeta != null
+                && !alterTableIds.contains(tabletMeta.getTableId())
+                && (canBalanceColocateTable || 
!colocateTableIndex.isColocateTable(tabletMeta.getTableId()))
+                && (recycleBin == null || 
!recycleBin.isRecyclePartition(tabletMeta.getDbId(),
+                        tabletMeta.getTableId(), tabletMeta.getPartitionId()));
+    }
+
     public AgentTask createBalanceTask(TabletSchedCtx tabletCtx)
             throws SchedException {
         completeSchedCtx(tabletCtx);
@@ -139,6 +163,10 @@ public abstract class Rebalancer {
         this.statisticMap = statisticMap;
     }
 
+    public void updateAlterTableIds(Set<Long> alterTableIds) {
+        this.alterTableIds = alterTableIds;
+    }
+
     public void addPrioBackends(List<Backend> backends, long timeoutS) {
         long currentTimeMillis = System.currentTimeMillis();
         for (Backend backend : backends) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index a3a3a93e0fa..10a21c2f305 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -352,6 +352,10 @@ public class TabletScheduler extends MasterDaemon {
         rebalancer.updateLoadStatistic(statisticMap);
         diskRebalancer.updateLoadStatistic(statisticMap);
 
+        Set<Long> alterTableIds = 
Env.getCurrentEnv().getAlterInstance().getUnfinishedAlterTableIds();
+        rebalancer.updateAlterTableIds(alterTableIds);
+        diskRebalancer.updateAlterTableIds(alterTableIds);
+
         lastStatUpdateTime = System.currentTimeMillis();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to