This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 641fe19d193 [opt](cloud) Optimize balance speed by reducing the 
complexity of the rebalance algorithm (#51733)
641fe19d193 is described below

commit 641fe19d193fcbbd018988a60989aa67ecfd5e56
Author: deardeng <[email protected]>
AuthorDate: Fri Jul 4 22:18:05 2025 +0800

    [opt](cloud) Optimize balance speed by reducing the complexity of the 
rebalance algorithm (#51733)
    
    Arrary List remove,time complexity is too high
    <img width="489" alt="image"
    
src="https://github.com/user-attachments/assets/ffa486f8-c9fb-4ee9-9f9c-408e189525d5";
    />
---
 .../doris/cloud/catalog/CloudTabletRebalancer.java | 510 ++++++++++++---------
 1 file changed, 288 insertions(+), 222 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 1b014caea82..17d72dd7446 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -54,6 +54,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -69,31 +70,31 @@ import java.util.stream.Collectors;
 public class CloudTabletRebalancer extends MasterDaemon {
     private static final Logger LOG = 
LogManager.getLogger(CloudTabletRebalancer.class);
 
-    private volatile ConcurrentHashMap<Long, List<Tablet>> beToTabletsGlobal =
-            new ConcurrentHashMap<Long, List<Tablet>>();
+    private volatile ConcurrentHashMap<Long, Set<Tablet>> beToTabletsGlobal =
+            new ConcurrentHashMap<Long, Set<Tablet>>();
 
-    private volatile ConcurrentHashMap<Long, List<Tablet>> 
beToColocateTabletsGlobal =
-            new ConcurrentHashMap<Long, List<Tablet>>();
+    private volatile ConcurrentHashMap<Long, Set<Tablet>> 
beToColocateTabletsGlobal =
+            new ConcurrentHashMap<Long, Set<Tablet>>();
 
     // used for cloud tablet report
-    private volatile ConcurrentHashMap<Long, List<Tablet>> 
beToTabletsGlobalInSecondary =
-            new ConcurrentHashMap<Long, List<Tablet>>();
+    private volatile ConcurrentHashMap<Long, Set<Tablet>> 
beToTabletsGlobalInSecondary =
+            new ConcurrentHashMap<Long, Set<Tablet>>();
 
-    private Map<Long, List<Tablet>> futureBeToTabletsGlobal;
+    private Map<Long, Set<Tablet>> futureBeToTabletsGlobal;
 
     private Map<String, List<Long>> clusterToBes;
 
     private Set<Long> allBes;
 
     // partitionId -> indexId -> be -> tablet
-    private Map<Long, Map<Long, Map<Long, List<Tablet>>>> partitionToTablets;
+    private Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partitionToTablets;
 
-    private Map<Long, Map<Long, Map<Long, List<Tablet>>>> 
futurePartitionToTablets;
+    private Map<Long, Map<Long, Map<Long, Set<Tablet>>>> 
futurePartitionToTablets;
 
     // tableId -> be -> tablet
-    private Map<Long, Map<Long, List<Tablet>>> beToTabletsInTable;
+    private Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable;
 
-    private Map<Long, Map<Long, List<Tablet>>> futureBeToTabletsInTable;
+    private Map<Long, Map<Long, Set<Tablet>>> futureBeToTabletsInTable;
 
     private Map<Long, Long> beToDecommissionedTime = new HashMap<Long, Long>();
 
@@ -157,7 +158,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
         public long srcBe;
         public long destBe;
         public boolean isGlobal;
-        public Map<Long, List<Tablet>> beToTablets;
+        public Map<Long, Set<Tablet>> beToTablets;
         public long startTimestamp;
         BalanceType balanceType;
     }
@@ -172,7 +173,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     public Set<Long> getSnapshotTabletsInPrimaryByBeId(Long beId) {
         Set<Long> tabletIds = Sets.newHashSet();
-        List<Tablet> tablets = beToTabletsGlobal.get(beId);
+        Set<Tablet> tablets = beToTabletsGlobal.get(beId);
         if (tablets != null) {
             for (Tablet tablet : tablets) {
                 tabletIds.add(tablet.getId());
@@ -191,7 +192,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     public Set<Long> getSnapshotTabletsInSecondaryByBeId(Long beId) {
         Set<Long> tabletIds = Sets.newHashSet();
-        List<Tablet> tablets = beToTabletsGlobalInSecondary.get(beId);
+        Set<Tablet> tablets = beToTabletsGlobalInSecondary.get(beId);
         if (tablets != null) {
             for (Tablet tablet : tablets) {
                 tabletIds.add(tablet.getId());
@@ -208,8 +209,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
     }
 
     public int getTabletNumByBackendId(long beId) {
-        List<Tablet> tablets = beToTabletsGlobal.get(beId);
-        List<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
+        Set<Tablet> tablets = beToTabletsGlobal.get(beId);
+        Set<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
 
         return (tablets == null ? 0 : tablets.size())
                 + (colocateTablets == null ? 0 : colocateTablets.size());
@@ -232,80 +233,80 @@ public class CloudTabletRebalancer extends MasterDaemon {
         }
 
         LOG.info("cloud tablet rebalance begin");
-
-        clusterToBes = new HashMap<String, List<Long>>();
-        allBes = new HashSet<Long>();
         long start = System.currentTimeMillis();
 
-        // 1 build cluster to backend info
+        buildClusterToBackendMap();
+        if (!completeRouteInfo()) {
+            return;
+        }
+
+        checkInflightWarmUpCacheAsync();
+        statRouteInfo();
+        migrateTabletsForSmoothUpgrade();
+        statRouteInfo();
+
+        indexBalanced = true;
+        tableBalanced = true;
+
+        performBalancing();
+
+        checkDecommissionState(clusterToBes);
+        LOG.info("finished to rebalancer. cost: {} ms", 
(System.currentTimeMillis() - start));
+    }
+
+    private void buildClusterToBackendMap() {
+        clusterToBes = new HashMap<>();
+        allBes = new HashSet<>();
         for (Long beId : cloudSystemInfoService.getAllBackendIds()) {
             Backend be = cloudSystemInfoService.getBackend(beId);
             if (be == null) {
                 LOG.info("backend {} not found", beId);
                 continue;
             }
-            clusterToBes.putIfAbsent(be.getCloudClusterId(), new 
ArrayList<Long>());
+            clusterToBes.putIfAbsent(be.getCloudClusterId(), new 
ArrayList<>());
             clusterToBes.get(be.getCloudClusterId()).add(beId);
             allBes.add(beId);
         }
         LOG.info("cluster to backends {}", clusterToBes);
+    }
 
-        // 2 complete route info
-        if (!completeRouteInfo()) {
-            return;
-        }
-
-        // 3 check whether the inflight preheating task has been completed
-        checkInflghtWarmUpCacheAsync();
-
-        // 4 migrate tablet for smooth upgrade
+    private void migrateTabletsForSmoothUpgrade() {
         Pair<Long, Long> pair;
-        statRouteInfo();
         while (!tabletsMigrateTasks.isEmpty()) {
             try {
                 pair = tabletsMigrateTasks.take();
+                LOG.debug("begin tablets migration from be {} to be {}", 
pair.first, pair.second);
+                migrateTablets(pair.first, pair.second);
             } catch (InterruptedException e) {
+                LOG.warn("migrate tablets failed", e);
                 throw new RuntimeException(e);
             }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("begin tablets migration from be {} to be {}", 
pair.first, pair.second);
-            }
-            migrateTablets(pair.first, pair.second);
         }
+    }
 
-        // 5 statistics be to tablets mapping information
-        statRouteInfo();
-
-        indexBalanced = true;
-        tableBalanced = true;
-
-        // 6 partition-level balance
+    private void performBalancing() {
+        // ATTN: In general, the order of `balance` should follow `partition`, 
`table`, and `global`.
+        // This is because performing `global` scheduling first and then 
`partition` scheduling may
+        // lead to ineffective scheduling. Specifically, `global` scheduling 
might place multiple tablets belonging
+        // to the same table or partition onto the same BE, while `partition` 
scheduling later requires these tablets
+        // to be dispersed across different BEs, resulting in unnecessary 
scheduling.
         if (Config.enable_cloud_partition_balance) {
             balanceAllPartitions();
         }
-
-        // 7 if tablets in partition-level already balanced, perform table 
balance
         if (Config.enable_cloud_table_balance && indexBalanced) {
             balanceAllTables();
         }
-
-        // 8 if tablets in partition-level and table-level already balanced, 
perform global balance
         if (Config.enable_cloud_global_balance && indexBalanced && 
tableBalanced) {
             globalBalance();
         }
-
-        // 9 check whether all tablets of decomission have been migrated
-        checkDecommissionState(clusterToBes);
-
-        LOG.info("finished to rebalancer. cost: {} ms", 
(System.currentTimeMillis() - start));
     }
 
     public void balanceAllPartitions() {
-        for (Map.Entry<Long, List<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
             LOG.info("before partition balance be {} tablet num {}", 
entry.getKey(), entry.getValue().size());
         }
 
-        for (Map.Entry<Long, List<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
             LOG.info("before partition balance be {} tablet num(current + pre 
heating inflight) {}",
                      entry.getKey(), entry.getValue().size());
         }
@@ -326,22 +327,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
             return;
         }
 
-        for (Map.Entry<Long, List<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
             LOG.info("after partition balance be {} tablet num {}", 
entry.getKey(), entry.getValue().size());
         }
 
-        for (Map.Entry<Long, List<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
             LOG.info("after partition balance be {} tablet num(current + pre 
heating inflight) {}",
                     entry.getKey(), entry.getValue().size());
         }
     }
 
     public void balanceAllTables() {
-        for (Map.Entry<Long, List<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
             LOG.info("before table balance be {} tablet num {}", 
entry.getKey(), entry.getValue().size());
         }
 
-        for (Map.Entry<Long, List<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
             LOG.info("before table balance be {} tablet num(current + pre 
heating inflight) {}",
                     entry.getKey(), entry.getValue().size());
         }
@@ -362,22 +363,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
             return;
         }
 
-        for (Map.Entry<Long, List<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
             LOG.info("after table balance be {} tablet num {}", 
entry.getKey(), entry.getValue().size());
         }
 
-        for (Map.Entry<Long, List<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
             LOG.info("after table balance be {} tablet num(current + pre 
heating inflight) {}",
                     entry.getKey(), entry.getValue().size());
         }
     }
 
     public void globalBalance() {
-        for (Map.Entry<Long, List<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
             LOG.info("before global balance be {} tablet num {}", 
entry.getKey(), entry.getValue().size());
         }
 
-        for (Map.Entry<Long, List<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
             LOG.info("before global balance be {} tablet num(current + pre 
heating inflight) {}",
                     entry.getKey(), entry.getValue().size());
         }
@@ -397,17 +398,17 @@ public class CloudTabletRebalancer extends MasterDaemon {
             return;
         }
 
-        for (Map.Entry<Long, List<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
beToTabletsGlobal.entrySet()) {
             LOG.info("after global balance be {} tablet num {}", 
entry.getKey(), entry.getValue().size());
         }
 
-        for (Map.Entry<Long, List<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
+        for (Map.Entry<Long, Set<Tablet>> entry : 
futureBeToTabletsGlobal.entrySet()) {
             LOG.info("after global balance be {} tablet num(current + pre 
heating inflight) {}",
                     entry.getKey(), entry.getValue().size());
         }
     }
 
-    public void checkInflghtWarmUpCacheAsync() {
+    public void checkInflightWarmUpCacheAsync() {
         Map<Long, List<InfightTask>> beToInfightTasks = new HashMap<Long, 
List<InfightTask>>();
 
         for (Map.Entry<InfightTablet, InfightTask> entry : 
tabletToInfightTask.entrySet()) {
@@ -494,7 +495,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
         for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
             List<Long> beList = entry.getValue();
             for (long beId : beList) {
-                List<Tablet> tablets = beToTabletsGlobal.get(beId);
+                Set<Tablet> tablets = beToTabletsGlobal.get(beId);
                 int tabletNum = tablets == null ? 0 : tablets.size();
                 Backend backend = cloudSystemInfoService.getBackend(beId);
                 if (backend == null) {
@@ -644,42 +645,42 @@ public class CloudTabletRebalancer extends MasterDaemon {
     }
 
     public void fillBeToTablets(long be, long tableId, long partId, long 
indexId, Tablet tablet,
-            Map<Long, List<Tablet>> globalBeToTablets,
-            Map<Long, Map<Long, List<Tablet>>> beToTabletsInTable,
-            Map<Long, Map<Long, Map<Long, List<Tablet>>>> partToTablets) {
+            Map<Long, Set<Tablet>> globalBeToTablets,
+            Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable,
+            Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partToTablets) {
         // global
-        globalBeToTablets.putIfAbsent(be, new ArrayList<Tablet>());
+        globalBeToTablets.putIfAbsent(be, new HashSet<Tablet>());
         globalBeToTablets.get(be).add(tablet);
 
         // table
-        beToTabletsInTable.putIfAbsent(tableId, new HashMap<Long, 
List<Tablet>>());
-        Map<Long, List<Tablet>> beToTabletsOfTable = 
beToTabletsInTable.get(tableId);
-        beToTabletsOfTable.putIfAbsent(be, new ArrayList<Tablet>());
+        beToTabletsInTable.putIfAbsent(tableId, new HashMap<Long, 
Set<Tablet>>());
+        Map<Long, Set<Tablet>> beToTabletsOfTable = 
beToTabletsInTable.get(tableId);
+        beToTabletsOfTable.putIfAbsent(be, new HashSet<Tablet>());
         beToTabletsOfTable.get(be).add(tablet);
 
         // partition
-        partToTablets.putIfAbsent(partId, new HashMap<Long, Map<Long, 
List<Tablet>>>());
-        Map<Long, Map<Long, List<Tablet>>> indexToTablets = 
partToTablets.get(partId);
-        indexToTablets.putIfAbsent(indexId, new HashMap<Long, List<Tablet>>());
-        Map<Long, List<Tablet>> beToTabletsOfIndex = 
indexToTablets.get(indexId);
-        beToTabletsOfIndex.putIfAbsent(be, new ArrayList<Tablet>());
+        partToTablets.putIfAbsent(partId, new HashMap<Long, Map<Long, 
Set<Tablet>>>());
+        Map<Long, Map<Long, Set<Tablet>>> indexToTablets = 
partToTablets.get(partId);
+        indexToTablets.putIfAbsent(indexId, new HashMap<Long, Set<Tablet>>());
+        Map<Long, Set<Tablet>> beToTabletsOfIndex = 
indexToTablets.get(indexId);
+        beToTabletsOfIndex.putIfAbsent(be, new HashSet<Tablet>());
         beToTabletsOfIndex.get(be).add(tablet);
     }
 
     public void statRouteInfo() {
-        ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobal = new 
ConcurrentHashMap<Long, List<Tablet>>();
-        ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobalInSecondary
-                = new ConcurrentHashMap<Long, List<Tablet>>();
-        ConcurrentHashMap<Long, List<Tablet>> tmpBeToColocateTabletsGlobal
-                = new ConcurrentHashMap<Long, List<Tablet>>();
+        ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobal = new 
ConcurrentHashMap<Long, Set<Tablet>>();
+        ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobalInSecondary
+                = new ConcurrentHashMap<Long, Set<Tablet>>();
+        ConcurrentHashMap<Long, Set<Tablet>> tmpBeToColocateTabletsGlobal
+                = new ConcurrentHashMap<Long, Set<Tablet>>();
 
-        futureBeToTabletsGlobal = new HashMap<Long, List<Tablet>>();
+        futureBeToTabletsGlobal = new HashMap<Long, Set<Tablet>>();
 
-        partitionToTablets = new HashMap<Long, Map<Long, Map<Long, 
List<Tablet>>>>();
-        futurePartitionToTablets = new HashMap<Long, Map<Long, Map<Long, 
List<Tablet>>>>();
+        partitionToTablets = new HashMap<Long, Map<Long, Map<Long, 
Set<Tablet>>>>();
+        futurePartitionToTablets = new HashMap<Long, Map<Long, Map<Long, 
Set<Tablet>>>>();
 
-        beToTabletsInTable = new HashMap<Long, Map<Long, List<Tablet>>>();
-        futureBeToTabletsInTable = new HashMap<Long, Map<Long, 
List<Tablet>>>();
+        beToTabletsInTable = new HashMap<Long, Map<Long, Set<Tablet>>>();
+        futureBeToTabletsInTable = new HashMap<Long, Map<Long, Set<Tablet>>>();
 
         loopCloudReplica((Database db, Table table, Partition partition, 
MaterializedIndex index, String cluster) -> {
             boolean isColocated = 
Env.getCurrentColocateIndex().isColocateTable(table.getId());
@@ -694,8 +695,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
                             continue;
                         }
                         if (allBes.contains(beId)) {
-                            List<Tablet> colocateTablets =
-                                    
tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new ArrayList<>());
+                            Set<Tablet> colocateTablets =
+                                    
tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new HashSet<>());
                             colocateTablets.add(tablet);
                         }
                         continue;
@@ -710,8 +711,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
                     Backend secondaryBe = replica.getSecondaryBackend(cluster);
                     long secondaryBeId = secondaryBe == null ? -1L : 
secondaryBe.getId();
                     if (allBes.contains(secondaryBeId)) {
-                        List<Tablet> tablets = tmpBeToTabletsGlobalInSecondary
-                                .computeIfAbsent(secondaryBeId, k -> new 
ArrayList<>());
+                        Set<Tablet> tablets = tmpBeToTabletsGlobalInSecondary
+                                .computeIfAbsent(secondaryBeId, k -> new 
HashSet<>());
                         tablets.add(tablet);
                     }
 
@@ -764,10 +765,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     public void balanceInPartition(List<Long> bes, String clusterId, 
List<UpdateCloudReplicaInfo> infos) {
         // balance all partition
-        for (Map.Entry<Long, Map<Long, Map<Long, List<Tablet>>>> 
partitionEntry : futurePartitionToTablets.entrySet()) {
-            Map<Long, Map<Long, List<Tablet>>> indexToTablets = 
partitionEntry.getValue();
+        for (Map.Entry<Long, Map<Long, Map<Long, Set<Tablet>>>> partitionEntry 
: futurePartitionToTablets.entrySet()) {
+            Map<Long, Map<Long, Set<Tablet>>> indexToTablets = 
partitionEntry.getValue();
             // balance all index of a partition
-            for (Map.Entry<Long, Map<Long, List<Tablet>>> entry : 
indexToTablets.entrySet()) {
+            for (Map.Entry<Long, Map<Long, Set<Tablet>>> entry : 
indexToTablets.entrySet()) {
                 // balance a index
                 balanceImpl(bes, clusterId, entry.getValue(), 
BalanceType.PARTITION, infos);
             }
@@ -776,7 +777,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     public void balanceInTable(List<Long> bes, String clusterId, 
List<UpdateCloudReplicaInfo> infos) {
         // balance all tables
-        for (Map.Entry<Long, Map<Long, List<Tablet>>> entry : 
futureBeToTabletsInTable.entrySet()) {
+        for (Map.Entry<Long, Map<Long, Set<Tablet>>> entry : 
futureBeToTabletsInTable.entrySet()) {
             balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE, 
infos);
         }
     }
@@ -846,10 +847,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
         return null;
     }
 
-    private void updateBeToTablets(Tablet pickedTablet, long srcBe, long 
destBe, BalanceType balanceType,
-            Map<Long, List<Tablet>> globalBeToTablets,
-            Map<Long, Map<Long, List<Tablet>>> beToTabletsInTable,
-            Map<Long, Map<Long, Map<Long, List<Tablet>>>> partToTablets) {
+    private void updateBeToTablets(Tablet pickedTablet, long srcBe, long 
destBe,
+            Map<Long, Set<Tablet>> globalBeToTablets,
+            Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable,
+            Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partToTablets) {
         CloudReplica replica = (CloudReplica) 
pickedTablet.getReplicas().get(0);
         long tableId = replica.getTableId();
         long partId = replica.getPartitionId();
@@ -892,47 +893,73 @@ public class CloudTabletRebalancer extends MasterDaemon {
         }
     }
 
-    private boolean getTransferPair(List<Long> bes, Map<Long, List<Tablet>> 
beToTablets, long avgNum,
+    private boolean getTransferPair(List<Long> bes, Map<Long, Set<Tablet>> 
beToTablets, long avgNum,
                                     TransferPairInfo pairInfo) {
-        long destBe = bes.get(0);
-        long srcBe = bes.get(0);
+        long srcBe = findSourceBackend(bes, beToTablets);
+        long destBe = findDestinationBackend(bes, beToTablets, srcBe);
 
-        long minTabletsNum = Long.MAX_VALUE;
+        if (srcBe == -1 || destBe == -1) {
+            return false; // No valid backend found
+        }
+
+        long minTabletsNum = beToTablets.get(destBe) == null ? 0 : 
beToTablets.get(destBe).size();
+        long maxTabletsNum = beToTablets.get(srcBe) == null ? 0 : 
beToTablets.get(srcBe).size();
+
+        if (!isTransferValid(srcBe, minTabletsNum, maxTabletsNum, avgNum)) {
+            return false; // Transfer conditions not met
+        }
+
+        pairInfo.srcBe = srcBe;
+        pairInfo.destBe = destBe;
+        pairInfo.minTabletsNum = minTabletsNum;
+        pairInfo.maxTabletsNum = maxTabletsNum;
+        return true;
+    }
+
+    private long findSourceBackend(List<Long> bes, Map<Long, Set<Tablet>> 
beToTablets) {
+        long srcBe = -1;
         long maxTabletsNum = 0;
-        boolean srcDecommissioned = false;
 
         for (Long be : bes) {
-            long tabletNum = beToTablets.get(be) == null ? 0 : 
beToTablets.get(be).size();
-            if (tabletNum > maxTabletsNum) {
-                srcBe = be;
-                maxTabletsNum = tabletNum;
-            }
-
+            long tabletNum = beToTablets.getOrDefault(be, 
Collections.emptySet()).size();
             Backend backend = cloudSystemInfoService.getBackend(be);
-            if (backend == null) {
+
+            // Check if the backend is decommissioned
+            if (backend != null) {
+                if (backend.isDecommissioning() && tabletNum > 0) {
+                    srcBe = be; // Mark as source if decommissioned and has 
tablets
+                    break; // Exit early if we found a decommissioned backend
+                }
+                if (!backend.isDecommissioning() && tabletNum > maxTabletsNum) 
{
+                    srcBe = be;
+                    maxTabletsNum = tabletNum;
+                }
+            } else {
                 LOG.info("backend {} not found", be);
-                continue;
-            }
-            if (tabletNum < minTabletsNum && backend.isAlive() && 
!backend.isDecommissioning()
-                    && !backend.isSmoothUpgradeSrc()) {
-                destBe = be;
-                minTabletsNum = tabletNum;
             }
         }
+        return srcBe;
+    }
+
+    private long findDestinationBackend(List<Long> bes, Map<Long, Set<Tablet>> 
beToTablets, long srcBe) {
+        long destBe = -1;
+        long minTabletsNum = Long.MAX_VALUE;
 
         for (Long be : bes) {
-            long tabletNum = beToTablets.get(be) == null ? 0 : 
beToTablets.get(be).size();
+            long tabletNum = beToTablets.getOrDefault(be, 
Collections.emptySet()).size();
             Backend backend = cloudSystemInfoService.getBackend(be);
-            if (backend == null) {
-                LOG.info("backend {} not found", be);
-                continue;
-            }
-            if (backend.isDecommissioning() && tabletNum > 0) {
-                srcBe = be;
-                srcDecommissioned = true;
-                break;
+            if (backend != null && backend.isAlive() && 
!backend.isDecommissioning() && !backend.isSmoothUpgradeSrc()) {
+                if (tabletNum < minTabletsNum) {
+                    destBe = be;
+                    minTabletsNum = tabletNum;
+                }
             }
         }
+        return destBe;
+    }
+
+    private boolean isTransferValid(long srcBe, long minTabletsNum, long 
maxTabletsNum, long avgNum) {
+        boolean srcDecommissioned = 
cloudSystemInfoService.getBackend(srcBe).isDecommissioning();
 
         if (!srcDecommissioned) {
             if ((maxTabletsNum < avgNum * (1 + 
Config.cloud_rebalance_percent_threshold)
@@ -941,142 +968,183 @@ public class CloudTabletRebalancer extends MasterDaemon 
{
                 return false;
             }
         }
-
-        pairInfo.srcBe = srcBe;
-        pairInfo.destBe = destBe;
-        pairInfo.minTabletsNum = minTabletsNum;
-        pairInfo.maxTabletsNum = maxTabletsNum;
         return true;
     }
 
     private boolean isConflict(long srcBe, long destBe, CloudReplica 
cloudReplica, BalanceType balanceType,
-            Map<Long, Map<Long, Map<Long, List<Tablet>>>> beToTabletsInParts,
-            Map<Long, Map<Long, List<Tablet>>> beToTabletsInTables) {
-        if (balanceType == balanceType.GLOBAL) {
-            // check is conflict with partition balance
-            long maxBeSize = 
beToTabletsInParts.get(cloudReplica.getPartitionId())
-                    .get(cloudReplica.getIndexId()).get(srcBe).size();
-            List<Tablet> destBeTablets = 
beToTabletsInParts.get(cloudReplica.getPartitionId())
-                    .get(cloudReplica.getIndexId()).get(destBe);
-            long minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
-            if (minBeSize >= maxBeSize) {
-                return true;
-            }
-
-            // check is conflict with table balance
-            maxBeSize = 
beToTabletsInTables.get(cloudReplica.getTableId()).get(srcBe).size();
-            destBeTablets = 
beToTabletsInTables.get(cloudReplica.getTableId()).get(destBe);
-            minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
-            if (minBeSize >= maxBeSize) {
-                return true;
-            }
+                           Map<Long, Map<Long, Map<Long, Set<Tablet>>>> 
beToTabletsInParts,
+                           Map<Long, Map<Long, Set<Tablet>>> 
beToTabletsInTables) {
+        if (cloudSystemInfoService.getBackend(srcBe).isDecommissioning()) {
+            return false; // If source BE is decommissioned, no conflict
         }
 
-        if (balanceType == balanceType.TABLE) {
-            // check is conflict with partition balance
-            long maxBeSize = 
beToTabletsInParts.get(cloudReplica.getPartitionId())
-                    .get(cloudReplica.getIndexId()).get(srcBe).size();
-            List<Tablet> destBeTablets = 
beToTabletsInParts.get(cloudReplica.getPartitionId())
-                    .get(cloudReplica.getIndexId()).get(destBe);
-            long minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
-            return minBeSize >= maxBeSize;
+        if (balanceType == BalanceType.GLOBAL) {
+            return checkGlobalBalanceConflict(srcBe, destBe, cloudReplica, 
beToTabletsInParts, beToTabletsInTables);
+        } else if (balanceType == BalanceType.TABLE) {
+            return checkTableBalanceConflict(srcBe, destBe, cloudReplica, 
beToTabletsInParts);
         }
 
         return false;
     }
 
-    private void balanceImpl(List<Long> bes, String clusterId, Map<Long, 
List<Tablet>> beToTablets,
+    private boolean checkGlobalBalanceConflict(long srcBe, long destBe, 
CloudReplica cloudReplica,
+                                               Map<Long, Map<Long, Map<Long, 
Set<Tablet>>>> beToTabletsInParts,
+                                               Map<Long, Map<Long, 
Set<Tablet>>> beToTabletsInTables) {
+        long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica, 
beToTabletsInParts);
+        long minBeSize = getTabletSizeInParts(destBe, cloudReplica, 
beToTabletsInParts);
+
+        if (minBeSize >= maxBeSize) {
+            return true; // Conflict detected
+        }
+
+        maxBeSize = getTabletSizeInBes(srcBe, cloudReplica, 
beToTabletsInTables);
+        minBeSize = getTabletSizeInBes(destBe, cloudReplica, 
beToTabletsInTables);
+
+        return minBeSize >= maxBeSize; // Conflict detected
+    }
+
+    private boolean checkTableBalanceConflict(long srcBe, long destBe, 
CloudReplica cloudReplica,
+                                              Map<Long, Map<Long, Map<Long, 
Set<Tablet>>>> beToTabletsInParts) {
+        long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica, 
beToTabletsInParts);
+        long minBeSize = getTabletSizeInParts(destBe, cloudReplica, 
beToTabletsInParts);
+
+        return minBeSize >= maxBeSize; // Conflict detected
+    }
+
+    private long getTabletSizeInParts(long beId, CloudReplica cloudReplica,
+                                         Map<Long, Map<Long, Map<Long, 
Set<Tablet>>>> beToTabletsInParts) {
+        Set<Tablet> tablets = 
beToTabletsInParts.get(cloudReplica.getPartitionId())
+                .get(cloudReplica.getIndexId()).get(beId);
+        return tablets == null ? 0 : tablets.size();
+    }
+
+    private long getTabletSizeInBes(long beId, CloudReplica cloudReplica,
+                                    Map<Long, Map<Long, Set<Tablet>>> 
beToTabletsInTables) {
+        Set<Tablet> tablets = 
beToTabletsInTables.get(cloudReplica.getTableId()).get(beId);
+        return tablets == null ? 0 : tablets.size();
+    }
+
+
+    private void balanceImpl(List<Long> bes, String clusterId, Map<Long, 
Set<Tablet>> beToTablets,
             BalanceType balanceType, List<UpdateCloudReplicaInfo> infos) {
         if (bes == null || bes.isEmpty() || beToTablets == null || 
beToTablets.isEmpty()) {
             return;
         }
 
-        long totalTabletsNum = 0;
-        long beNum = 0;
-        for (Long be : bes) {
-            long tabletNum = beToTablets.get(be) == null ? 0 : 
beToTablets.get(be).size();
-            Backend backend = cloudSystemInfoService.getBackend(be);
-            if (backend != null && !backend.isDecommissioning()) {
-                beNum++;
-            }
-            totalTabletsNum += tabletNum;
-        }
+        long totalTabletsNum = calculateTotalTablets(bes, beToTablets);
+        long beNum = countActiveBackends(bes);
+
         if (beNum == 0) {
             LOG.warn("zero be, but want balance, skip");
             return;
         }
+
         long avgNum = totalTabletsNum / beNum;
-        long transferNum = Math.max(Math.round(avgNum * 
Config.cloud_balance_tablet_percent_per_run),
-                                    
Config.cloud_min_balance_tablet_num_per_run);
+        long transferNum = calculateTransferNum(avgNum);
 
         for (int i = 0; i < transferNum; i++) {
             TransferPairInfo pairInfo = new TransferPairInfo();
             if (!getTransferPair(bes, beToTablets, avgNum, pairInfo)) {
-                // no need balance;
-                break;
-            }
-
-            if (balanceType == balanceType.PARTITION) {
-                indexBalanced = false;
+                break; // no need balance
             }
 
-            if (balanceType == balanceType.TABLE) {
-                tableBalanced = false;
-            }
+            updateBalanceStatus(balanceType);
 
             long srcBe = pairInfo.srcBe;
             long destBe = pairInfo.destBe;
-            long minTabletsNum = pairInfo.minTabletsNum;
-            long maxTabletsNum = pairInfo.maxTabletsNum;
 
-            int randomIndex = rand.nextInt(beToTablets.get(srcBe).size());
-            Tablet pickedTablet = beToTablets.get(srcBe).get(randomIndex);
-            CloudReplica cloudReplica = (CloudReplica) 
pickedTablet.getReplicas().get(0);
+            Tablet pickedTablet = pickRandomTablet(beToTablets.get(srcBe));
+            if (pickedTablet == null) {
+                continue; // No tablet to pick
+            }
 
+            CloudReplica cloudReplica = (CloudReplica) 
pickedTablet.getReplicas().get(0);
             Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe);
-            // if srcBe is dead, destBe cann't download cache from it, 
preheating will failed
+
             if (Config.enable_cloud_warm_up_for_rebalance && srcBackend != 
null && srcBackend.isAlive()) {
-                if (isConflict(srcBe, destBe, cloudReplica, balanceType, 
futurePartitionToTablets,
-                        futureBeToTabletsInTable)) {
+                if (isConflict(srcBe, destBe, cloudReplica, balanceType,
+                        futurePartitionToTablets, futureBeToTabletsInTable)) {
                     continue;
                 }
-
-                try {
-                    sendPreHeatingRpc(pickedTablet, srcBe, destBe);
-                } catch (Exception e) {
-                    break;
-                }
-
-                InfightTask task = new InfightTask();
-                task.pickedTablet = pickedTablet;
-                task.srcBe = srcBe;
-                task.destBe = destBe;
-                task.balanceType = balanceType;
-                task.beToTablets = beToTablets;
-                task.startTimestamp = System.currentTimeMillis() / 1000;
-                tabletToInfightTask.put(new 
InfightTablet(pickedTablet.getId(), clusterId), task);
-
-                LOG.info("pre cache {} from {} to {}, cluster {} minNum {} 
maxNum {} beNum {} tabletsNum {}, part {}",
-                         pickedTablet.getId(), srcBe, destBe, clusterId,
-                         minTabletsNum, maxTabletsNum, beNum, totalTabletsNum, 
cloudReplica.getPartitionId());
-                updateBeToTablets(pickedTablet, srcBe, destBe, balanceType,
-                        futureBeToTabletsGlobal, futureBeToTabletsInTable, 
futurePartitionToTablets);
+                preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId, 
balanceType, beToTablets);
             } else {
                 if (isConflict(srcBe, destBe, cloudReplica, balanceType, 
partitionToTablets, beToTabletsInTable)) {
                     continue;
                 }
+                transferTablet(pickedTablet, srcBe, destBe, clusterId, 
balanceType, infos);
+            }
+        }
+    }
 
-                LOG.info("transfer {} from {} to {}, cluster {} minNum {} 
maxNum {} beNum {} tabletsNum {}, part {}",
-                        pickedTablet.getId(), srcBe, destBe, clusterId,
-                        minTabletsNum, maxTabletsNum, beNum, totalTabletsNum, 
cloudReplica.getPartitionId());
+    private long calculateTotalTablets(List<Long> bes, Map<Long, Set<Tablet>> 
beToTablets) {
+        return bes.stream()
+                .mapToLong(be -> beToTablets.getOrDefault(be, 
Collections.emptySet()).size())
+                .sum();
+    }
 
-                updateBeToTablets(pickedTablet, srcBe, destBe, balanceType, 
beToTabletsGlobal,
-                        beToTabletsInTable, partitionToTablets);
-                updateBeToTablets(pickedTablet, srcBe, destBe, balanceType,
-                        futureBeToTabletsGlobal, futureBeToTabletsInTable, 
futurePartitionToTablets);
-                updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
-            }
+    private long countActiveBackends(List<Long> bes) {
+        return bes.stream()
+                .filter(be -> {
+                    Backend backend = cloudSystemInfoService.getBackend(be);
+                    return backend != null && !backend.isDecommissioning();
+                })
+                .count();
+    }
+
+    private long calculateTransferNum(long avgNum) {
+        return Math.max(Math.round(avgNum * 
Config.cloud_balance_tablet_percent_per_run),
+                        Config.cloud_min_balance_tablet_num_per_run);
+    }
+
+    private void updateBalanceStatus(BalanceType balanceType) {
+        if (balanceType == BalanceType.PARTITION) {
+            indexBalanced = false;
+        } else if (balanceType == BalanceType.TABLE) {
+            tableBalanced = false;
+        }
+    }
+
+    private Tablet pickRandomTablet(Set<Tablet> tablets) {
+        if (tablets.isEmpty()) {
+            return null;
+        }
+        int randomIndex = rand.nextInt(tablets.size());
+        return tablets.stream().skip(randomIndex).findFirst().orElse(null);
+    }
+
+    private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long 
destBe, String clusterId,
+                                     BalanceType balanceType, Map<Long, 
Set<Tablet>> beToTablets) {
+        try {
+            sendPreHeatingRpc(pickedTablet, srcBe, destBe);
+        } catch (Exception e) {
+            LOG.warn("Failed to preheat tablet {} from {} to {}, "
+                    + "help msg turn off fe config 
enable_cloud_warm_up_for_rebalance",
+                    pickedTablet.getId(), srcBe, destBe, e);
+            return;
         }
+
+        InfightTask task = new InfightTask();
+        task.pickedTablet = pickedTablet;
+        task.srcBe = srcBe;
+        task.destBe = destBe;
+        task.balanceType = balanceType;
+        task.beToTablets = beToTablets;
+        task.startTimestamp = System.currentTimeMillis() / 1000;
+        tabletToInfightTask.put(new InfightTablet(pickedTablet.getId(), 
clusterId), task);
+
+        LOG.info("pre cache {} from {} to {}, cluster {}", 
pickedTablet.getId(), srcBe, destBe, clusterId);
+        updateBeToTablets(pickedTablet, srcBe, destBe,
+                futureBeToTabletsGlobal, futureBeToTabletsInTable, 
futurePartitionToTablets);
+    }
+
+    private void transferTablet(Tablet pickedTablet, long srcBe, long destBe, 
String clusterId,
+                            BalanceType balanceType, 
List<UpdateCloudReplicaInfo> infos) {
+        LOG.info("transfer {} from {} to {}, cluster {}", 
pickedTablet.getId(), srcBe, destBe, clusterId);
+        updateBeToTablets(pickedTablet, srcBe, destBe,
+                beToTabletsGlobal, beToTabletsInTable, partitionToTablets);
+        updateBeToTablets(pickedTablet, srcBe, destBe,
+                futureBeToTabletsGlobal, futureBeToTabletsInTable, 
futurePartitionToTablets);
+        updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
     }
 
     public void addTabletMigrationTask(Long srcBe, Long dstBe) {
@@ -1088,7 +1156,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
      */
     private void migrateTablets(Long srcBe, Long dstBe) {
         // get tablets
-        List<Tablet> tablets = beToTabletsGlobal.get(srcBe);
+        Set<Tablet> tablets = beToTabletsGlobal.get(srcBe);
         if (tablets == null || tablets.isEmpty()) {
             LOG.info("smooth upgrade srcBe={} does not have any tablets, set 
inactive", srcBe);
             ((CloudEnv) 
Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
@@ -1224,6 +1292,4 @@ public class CloudTabletRebalancer extends MasterDaemon {
         }
         return rets;
     }
-
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to