This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 641fe19d193 [opt](cloud) Optimize balance speed by reducing the
complexity of the rebalance algorithm (#51733)
641fe19d193 is described below
commit 641fe19d193fcbbd018988a60989aa67ecfd5e56
Author: deardeng <[email protected]>
AuthorDate: Fri Jul 4 22:18:05 2025 +0800
[opt](cloud) Optimize balance speed by reducing the complexity of the
rebalance algorithm (#51733)
Arrary List removeļ¼time complexity is too high
<img width="489" alt="image"
src="https://github.com/user-attachments/assets/ffa486f8-c9fb-4ee9-9f9c-408e189525d5"
/>
---
.../doris/cloud/catalog/CloudTabletRebalancer.java | 510 ++++++++++++---------
1 file changed, 288 insertions(+), 222 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 1b014caea82..17d72dd7446 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -54,6 +54,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -69,31 +70,31 @@ import java.util.stream.Collectors;
public class CloudTabletRebalancer extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(CloudTabletRebalancer.class);
- private volatile ConcurrentHashMap<Long, List<Tablet>> beToTabletsGlobal =
- new ConcurrentHashMap<Long, List<Tablet>>();
+ private volatile ConcurrentHashMap<Long, Set<Tablet>> beToTabletsGlobal =
+ new ConcurrentHashMap<Long, Set<Tablet>>();
- private volatile ConcurrentHashMap<Long, List<Tablet>>
beToColocateTabletsGlobal =
- new ConcurrentHashMap<Long, List<Tablet>>();
+ private volatile ConcurrentHashMap<Long, Set<Tablet>>
beToColocateTabletsGlobal =
+ new ConcurrentHashMap<Long, Set<Tablet>>();
// used for cloud tablet report
- private volatile ConcurrentHashMap<Long, List<Tablet>>
beToTabletsGlobalInSecondary =
- new ConcurrentHashMap<Long, List<Tablet>>();
+ private volatile ConcurrentHashMap<Long, Set<Tablet>>
beToTabletsGlobalInSecondary =
+ new ConcurrentHashMap<Long, Set<Tablet>>();
- private Map<Long, List<Tablet>> futureBeToTabletsGlobal;
+ private Map<Long, Set<Tablet>> futureBeToTabletsGlobal;
private Map<String, List<Long>> clusterToBes;
private Set<Long> allBes;
// partitionId -> indexId -> be -> tablet
- private Map<Long, Map<Long, Map<Long, List<Tablet>>>> partitionToTablets;
+ private Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partitionToTablets;
- private Map<Long, Map<Long, Map<Long, List<Tablet>>>>
futurePartitionToTablets;
+ private Map<Long, Map<Long, Map<Long, Set<Tablet>>>>
futurePartitionToTablets;
// tableId -> be -> tablet
- private Map<Long, Map<Long, List<Tablet>>> beToTabletsInTable;
+ private Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable;
- private Map<Long, Map<Long, List<Tablet>>> futureBeToTabletsInTable;
+ private Map<Long, Map<Long, Set<Tablet>>> futureBeToTabletsInTable;
private Map<Long, Long> beToDecommissionedTime = new HashMap<Long, Long>();
@@ -157,7 +158,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
public long srcBe;
public long destBe;
public boolean isGlobal;
- public Map<Long, List<Tablet>> beToTablets;
+ public Map<Long, Set<Tablet>> beToTablets;
public long startTimestamp;
BalanceType balanceType;
}
@@ -172,7 +173,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
public Set<Long> getSnapshotTabletsInPrimaryByBeId(Long beId) {
Set<Long> tabletIds = Sets.newHashSet();
- List<Tablet> tablets = beToTabletsGlobal.get(beId);
+ Set<Tablet> tablets = beToTabletsGlobal.get(beId);
if (tablets != null) {
for (Tablet tablet : tablets) {
tabletIds.add(tablet.getId());
@@ -191,7 +192,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
public Set<Long> getSnapshotTabletsInSecondaryByBeId(Long beId) {
Set<Long> tabletIds = Sets.newHashSet();
- List<Tablet> tablets = beToTabletsGlobalInSecondary.get(beId);
+ Set<Tablet> tablets = beToTabletsGlobalInSecondary.get(beId);
if (tablets != null) {
for (Tablet tablet : tablets) {
tabletIds.add(tablet.getId());
@@ -208,8 +209,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
public int getTabletNumByBackendId(long beId) {
- List<Tablet> tablets = beToTabletsGlobal.get(beId);
- List<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
+ Set<Tablet> tablets = beToTabletsGlobal.get(beId);
+ Set<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
return (tablets == null ? 0 : tablets.size())
+ (colocateTablets == null ? 0 : colocateTablets.size());
@@ -232,80 +233,80 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
LOG.info("cloud tablet rebalance begin");
-
- clusterToBes = new HashMap<String, List<Long>>();
- allBes = new HashSet<Long>();
long start = System.currentTimeMillis();
- // 1 build cluster to backend info
+ buildClusterToBackendMap();
+ if (!completeRouteInfo()) {
+ return;
+ }
+
+ checkInflightWarmUpCacheAsync();
+ statRouteInfo();
+ migrateTabletsForSmoothUpgrade();
+ statRouteInfo();
+
+ indexBalanced = true;
+ tableBalanced = true;
+
+ performBalancing();
+
+ checkDecommissionState(clusterToBes);
+ LOG.info("finished to rebalancer. cost: {} ms",
(System.currentTimeMillis() - start));
+ }
+
+ private void buildClusterToBackendMap() {
+ clusterToBes = new HashMap<>();
+ allBes = new HashSet<>();
for (Long beId : cloudSystemInfoService.getAllBackendIds()) {
Backend be = cloudSystemInfoService.getBackend(beId);
if (be == null) {
LOG.info("backend {} not found", beId);
continue;
}
- clusterToBes.putIfAbsent(be.getCloudClusterId(), new
ArrayList<Long>());
+ clusterToBes.putIfAbsent(be.getCloudClusterId(), new
ArrayList<>());
clusterToBes.get(be.getCloudClusterId()).add(beId);
allBes.add(beId);
}
LOG.info("cluster to backends {}", clusterToBes);
+ }
- // 2 complete route info
- if (!completeRouteInfo()) {
- return;
- }
-
- // 3 check whether the inflight preheating task has been completed
- checkInflghtWarmUpCacheAsync();
-
- // 4 migrate tablet for smooth upgrade
+ private void migrateTabletsForSmoothUpgrade() {
Pair<Long, Long> pair;
- statRouteInfo();
while (!tabletsMigrateTasks.isEmpty()) {
try {
pair = tabletsMigrateTasks.take();
+ LOG.debug("begin tablets migration from be {} to be {}",
pair.first, pair.second);
+ migrateTablets(pair.first, pair.second);
} catch (InterruptedException e) {
+ LOG.warn("migrate tablets failed", e);
throw new RuntimeException(e);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("begin tablets migration from be {} to be {}",
pair.first, pair.second);
- }
- migrateTablets(pair.first, pair.second);
}
+ }
- // 5 statistics be to tablets mapping information
- statRouteInfo();
-
- indexBalanced = true;
- tableBalanced = true;
-
- // 6 partition-level balance
+ private void performBalancing() {
+ // ATTN: In general, the order of `balance` should follow `partition`,
`table`, and `global`.
+ // This is because performing `global` scheduling first and then
`partition` scheduling may
+ // lead to ineffective scheduling. Specifically, `global` scheduling
might place multiple tablets belonging
+ // to the same table or partition onto the same BE, while `partition`
scheduling later requires these tablets
+ // to be dispersed across different BEs, resulting in unnecessary
scheduling.
if (Config.enable_cloud_partition_balance) {
balanceAllPartitions();
}
-
- // 7 if tablets in partition-level already balanced, perform table
balance
if (Config.enable_cloud_table_balance && indexBalanced) {
balanceAllTables();
}
-
- // 8 if tablets in partition-level and table-level already balanced,
perform global balance
if (Config.enable_cloud_global_balance && indexBalanced &&
tableBalanced) {
globalBalance();
}
-
- // 9 check whether all tablets of decomission have been migrated
- checkDecommissionState(clusterToBes);
-
- LOG.info("finished to rebalancer. cost: {} ms",
(System.currentTimeMillis() - start));
}
public void balanceAllPartitions() {
- for (Map.Entry<Long, List<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
LOG.info("before partition balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, List<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.info("before partition balance be {} tablet num(current + pre
heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
@@ -326,22 +327,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
return;
}
- for (Map.Entry<Long, List<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
LOG.info("after partition balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, List<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.info("after partition balance be {} tablet num(current + pre
heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
}
public void balanceAllTables() {
- for (Map.Entry<Long, List<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
LOG.info("before table balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, List<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.info("before table balance be {} tablet num(current + pre
heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
@@ -362,22 +363,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
return;
}
- for (Map.Entry<Long, List<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
LOG.info("after table balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, List<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.info("after table balance be {} tablet num(current + pre
heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
}
public void globalBalance() {
- for (Map.Entry<Long, List<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
LOG.info("before global balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, List<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.info("before global balance be {} tablet num(current + pre
heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
@@ -397,17 +398,17 @@ public class CloudTabletRebalancer extends MasterDaemon {
return;
}
- for (Map.Entry<Long, List<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
LOG.info("after global balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, List<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.info("after global balance be {} tablet num(current + pre
heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
}
- public void checkInflghtWarmUpCacheAsync() {
+ public void checkInflightWarmUpCacheAsync() {
Map<Long, List<InfightTask>> beToInfightTasks = new HashMap<Long,
List<InfightTask>>();
for (Map.Entry<InfightTablet, InfightTask> entry :
tabletToInfightTask.entrySet()) {
@@ -494,7 +495,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
List<Long> beList = entry.getValue();
for (long beId : beList) {
- List<Tablet> tablets = beToTabletsGlobal.get(beId);
+ Set<Tablet> tablets = beToTabletsGlobal.get(beId);
int tabletNum = tablets == null ? 0 : tablets.size();
Backend backend = cloudSystemInfoService.getBackend(beId);
if (backend == null) {
@@ -644,42 +645,42 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
public void fillBeToTablets(long be, long tableId, long partId, long
indexId, Tablet tablet,
- Map<Long, List<Tablet>> globalBeToTablets,
- Map<Long, Map<Long, List<Tablet>>> beToTabletsInTable,
- Map<Long, Map<Long, Map<Long, List<Tablet>>>> partToTablets) {
+ Map<Long, Set<Tablet>> globalBeToTablets,
+ Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable,
+ Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partToTablets) {
// global
- globalBeToTablets.putIfAbsent(be, new ArrayList<Tablet>());
+ globalBeToTablets.putIfAbsent(be, new HashSet<Tablet>());
globalBeToTablets.get(be).add(tablet);
// table
- beToTabletsInTable.putIfAbsent(tableId, new HashMap<Long,
List<Tablet>>());
- Map<Long, List<Tablet>> beToTabletsOfTable =
beToTabletsInTable.get(tableId);
- beToTabletsOfTable.putIfAbsent(be, new ArrayList<Tablet>());
+ beToTabletsInTable.putIfAbsent(tableId, new HashMap<Long,
Set<Tablet>>());
+ Map<Long, Set<Tablet>> beToTabletsOfTable =
beToTabletsInTable.get(tableId);
+ beToTabletsOfTable.putIfAbsent(be, new HashSet<Tablet>());
beToTabletsOfTable.get(be).add(tablet);
// partition
- partToTablets.putIfAbsent(partId, new HashMap<Long, Map<Long,
List<Tablet>>>());
- Map<Long, Map<Long, List<Tablet>>> indexToTablets =
partToTablets.get(partId);
- indexToTablets.putIfAbsent(indexId, new HashMap<Long, List<Tablet>>());
- Map<Long, List<Tablet>> beToTabletsOfIndex =
indexToTablets.get(indexId);
- beToTabletsOfIndex.putIfAbsent(be, new ArrayList<Tablet>());
+ partToTablets.putIfAbsent(partId, new HashMap<Long, Map<Long,
Set<Tablet>>>());
+ Map<Long, Map<Long, Set<Tablet>>> indexToTablets =
partToTablets.get(partId);
+ indexToTablets.putIfAbsent(indexId, new HashMap<Long, Set<Tablet>>());
+ Map<Long, Set<Tablet>> beToTabletsOfIndex =
indexToTablets.get(indexId);
+ beToTabletsOfIndex.putIfAbsent(be, new HashSet<Tablet>());
beToTabletsOfIndex.get(be).add(tablet);
}
public void statRouteInfo() {
- ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobal = new
ConcurrentHashMap<Long, List<Tablet>>();
- ConcurrentHashMap<Long, List<Tablet>> tmpBeToTabletsGlobalInSecondary
- = new ConcurrentHashMap<Long, List<Tablet>>();
- ConcurrentHashMap<Long, List<Tablet>> tmpBeToColocateTabletsGlobal
- = new ConcurrentHashMap<Long, List<Tablet>>();
+ ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobal = new
ConcurrentHashMap<Long, Set<Tablet>>();
+ ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobalInSecondary
+ = new ConcurrentHashMap<Long, Set<Tablet>>();
+ ConcurrentHashMap<Long, Set<Tablet>> tmpBeToColocateTabletsGlobal
+ = new ConcurrentHashMap<Long, Set<Tablet>>();
- futureBeToTabletsGlobal = new HashMap<Long, List<Tablet>>();
+ futureBeToTabletsGlobal = new HashMap<Long, Set<Tablet>>();
- partitionToTablets = new HashMap<Long, Map<Long, Map<Long,
List<Tablet>>>>();
- futurePartitionToTablets = new HashMap<Long, Map<Long, Map<Long,
List<Tablet>>>>();
+ partitionToTablets = new HashMap<Long, Map<Long, Map<Long,
Set<Tablet>>>>();
+ futurePartitionToTablets = new HashMap<Long, Map<Long, Map<Long,
Set<Tablet>>>>();
- beToTabletsInTable = new HashMap<Long, Map<Long, List<Tablet>>>();
- futureBeToTabletsInTable = new HashMap<Long, Map<Long,
List<Tablet>>>();
+ beToTabletsInTable = new HashMap<Long, Map<Long, Set<Tablet>>>();
+ futureBeToTabletsInTable = new HashMap<Long, Map<Long, Set<Tablet>>>();
loopCloudReplica((Database db, Table table, Partition partition,
MaterializedIndex index, String cluster) -> {
boolean isColocated =
Env.getCurrentColocateIndex().isColocateTable(table.getId());
@@ -694,8 +695,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
continue;
}
if (allBes.contains(beId)) {
- List<Tablet> colocateTablets =
-
tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new ArrayList<>());
+ Set<Tablet> colocateTablets =
+
tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new HashSet<>());
colocateTablets.add(tablet);
}
continue;
@@ -710,8 +711,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
Backend secondaryBe = replica.getSecondaryBackend(cluster);
long secondaryBeId = secondaryBe == null ? -1L :
secondaryBe.getId();
if (allBes.contains(secondaryBeId)) {
- List<Tablet> tablets = tmpBeToTabletsGlobalInSecondary
- .computeIfAbsent(secondaryBeId, k -> new
ArrayList<>());
+ Set<Tablet> tablets = tmpBeToTabletsGlobalInSecondary
+ .computeIfAbsent(secondaryBeId, k -> new
HashSet<>());
tablets.add(tablet);
}
@@ -764,10 +765,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
public void balanceInPartition(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos) {
// balance all partition
- for (Map.Entry<Long, Map<Long, Map<Long, List<Tablet>>>>
partitionEntry : futurePartitionToTablets.entrySet()) {
- Map<Long, Map<Long, List<Tablet>>> indexToTablets =
partitionEntry.getValue();
+ for (Map.Entry<Long, Map<Long, Map<Long, Set<Tablet>>>> partitionEntry
: futurePartitionToTablets.entrySet()) {
+ Map<Long, Map<Long, Set<Tablet>>> indexToTablets =
partitionEntry.getValue();
// balance all index of a partition
- for (Map.Entry<Long, Map<Long, List<Tablet>>> entry :
indexToTablets.entrySet()) {
+ for (Map.Entry<Long, Map<Long, Set<Tablet>>> entry :
indexToTablets.entrySet()) {
// balance a index
balanceImpl(bes, clusterId, entry.getValue(),
BalanceType.PARTITION, infos);
}
@@ -776,7 +777,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
public void balanceInTable(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos) {
// balance all tables
- for (Map.Entry<Long, Map<Long, List<Tablet>>> entry :
futureBeToTabletsInTable.entrySet()) {
+ for (Map.Entry<Long, Map<Long, Set<Tablet>>> entry :
futureBeToTabletsInTable.entrySet()) {
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE,
infos);
}
}
@@ -846,10 +847,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
return null;
}
- private void updateBeToTablets(Tablet pickedTablet, long srcBe, long
destBe, BalanceType balanceType,
- Map<Long, List<Tablet>> globalBeToTablets,
- Map<Long, Map<Long, List<Tablet>>> beToTabletsInTable,
- Map<Long, Map<Long, Map<Long, List<Tablet>>>> partToTablets) {
+ private void updateBeToTablets(Tablet pickedTablet, long srcBe, long
destBe,
+ Map<Long, Set<Tablet>> globalBeToTablets,
+ Map<Long, Map<Long, Set<Tablet>>> beToTabletsInTable,
+ Map<Long, Map<Long, Map<Long, Set<Tablet>>>> partToTablets) {
CloudReplica replica = (CloudReplica)
pickedTablet.getReplicas().get(0);
long tableId = replica.getTableId();
long partId = replica.getPartitionId();
@@ -892,47 +893,73 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
}
- private boolean getTransferPair(List<Long> bes, Map<Long, List<Tablet>>
beToTablets, long avgNum,
+ private boolean getTransferPair(List<Long> bes, Map<Long, Set<Tablet>>
beToTablets, long avgNum,
TransferPairInfo pairInfo) {
- long destBe = bes.get(0);
- long srcBe = bes.get(0);
+ long srcBe = findSourceBackend(bes, beToTablets);
+ long destBe = findDestinationBackend(bes, beToTablets, srcBe);
- long minTabletsNum = Long.MAX_VALUE;
+ if (srcBe == -1 || destBe == -1) {
+ return false; // No valid backend found
+ }
+
+ long minTabletsNum = beToTablets.get(destBe) == null ? 0 :
beToTablets.get(destBe).size();
+ long maxTabletsNum = beToTablets.get(srcBe) == null ? 0 :
beToTablets.get(srcBe).size();
+
+ if (!isTransferValid(srcBe, minTabletsNum, maxTabletsNum, avgNum)) {
+ return false; // Transfer conditions not met
+ }
+
+ pairInfo.srcBe = srcBe;
+ pairInfo.destBe = destBe;
+ pairInfo.minTabletsNum = minTabletsNum;
+ pairInfo.maxTabletsNum = maxTabletsNum;
+ return true;
+ }
+
+ private long findSourceBackend(List<Long> bes, Map<Long, Set<Tablet>>
beToTablets) {
+ long srcBe = -1;
long maxTabletsNum = 0;
- boolean srcDecommissioned = false;
for (Long be : bes) {
- long tabletNum = beToTablets.get(be) == null ? 0 :
beToTablets.get(be).size();
- if (tabletNum > maxTabletsNum) {
- srcBe = be;
- maxTabletsNum = tabletNum;
- }
-
+ long tabletNum = beToTablets.getOrDefault(be,
Collections.emptySet()).size();
Backend backend = cloudSystemInfoService.getBackend(be);
- if (backend == null) {
+
+ // Check if the backend is decommissioned
+ if (backend != null) {
+ if (backend.isDecommissioning() && tabletNum > 0) {
+ srcBe = be; // Mark as source if decommissioned and has
tablets
+ break; // Exit early if we found a decommissioned backend
+ }
+ if (!backend.isDecommissioning() && tabletNum > maxTabletsNum)
{
+ srcBe = be;
+ maxTabletsNum = tabletNum;
+ }
+ } else {
LOG.info("backend {} not found", be);
- continue;
- }
- if (tabletNum < minTabletsNum && backend.isAlive() &&
!backend.isDecommissioning()
- && !backend.isSmoothUpgradeSrc()) {
- destBe = be;
- minTabletsNum = tabletNum;
}
}
+ return srcBe;
+ }
+
+ private long findDestinationBackend(List<Long> bes, Map<Long, Set<Tablet>>
beToTablets, long srcBe) {
+ long destBe = -1;
+ long minTabletsNum = Long.MAX_VALUE;
for (Long be : bes) {
- long tabletNum = beToTablets.get(be) == null ? 0 :
beToTablets.get(be).size();
+ long tabletNum = beToTablets.getOrDefault(be,
Collections.emptySet()).size();
Backend backend = cloudSystemInfoService.getBackend(be);
- if (backend == null) {
- LOG.info("backend {} not found", be);
- continue;
- }
- if (backend.isDecommissioning() && tabletNum > 0) {
- srcBe = be;
- srcDecommissioned = true;
- break;
+ if (backend != null && backend.isAlive() &&
!backend.isDecommissioning() && !backend.isSmoothUpgradeSrc()) {
+ if (tabletNum < minTabletsNum) {
+ destBe = be;
+ minTabletsNum = tabletNum;
+ }
}
}
+ return destBe;
+ }
+
+ private boolean isTransferValid(long srcBe, long minTabletsNum, long
maxTabletsNum, long avgNum) {
+ boolean srcDecommissioned =
cloudSystemInfoService.getBackend(srcBe).isDecommissioning();
if (!srcDecommissioned) {
if ((maxTabletsNum < avgNum * (1 +
Config.cloud_rebalance_percent_threshold)
@@ -941,142 +968,183 @@ public class CloudTabletRebalancer extends MasterDaemon
{
return false;
}
}
-
- pairInfo.srcBe = srcBe;
- pairInfo.destBe = destBe;
- pairInfo.minTabletsNum = minTabletsNum;
- pairInfo.maxTabletsNum = maxTabletsNum;
return true;
}
private boolean isConflict(long srcBe, long destBe, CloudReplica
cloudReplica, BalanceType balanceType,
- Map<Long, Map<Long, Map<Long, List<Tablet>>>> beToTabletsInParts,
- Map<Long, Map<Long, List<Tablet>>> beToTabletsInTables) {
- if (balanceType == balanceType.GLOBAL) {
- // check is conflict with partition balance
- long maxBeSize =
beToTabletsInParts.get(cloudReplica.getPartitionId())
- .get(cloudReplica.getIndexId()).get(srcBe).size();
- List<Tablet> destBeTablets =
beToTabletsInParts.get(cloudReplica.getPartitionId())
- .get(cloudReplica.getIndexId()).get(destBe);
- long minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
- if (minBeSize >= maxBeSize) {
- return true;
- }
-
- // check is conflict with table balance
- maxBeSize =
beToTabletsInTables.get(cloudReplica.getTableId()).get(srcBe).size();
- destBeTablets =
beToTabletsInTables.get(cloudReplica.getTableId()).get(destBe);
- minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
- if (minBeSize >= maxBeSize) {
- return true;
- }
+ Map<Long, Map<Long, Map<Long, Set<Tablet>>>>
beToTabletsInParts,
+ Map<Long, Map<Long, Set<Tablet>>>
beToTabletsInTables) {
+ if (cloudSystemInfoService.getBackend(srcBe).isDecommissioning()) {
+ return false; // If source BE is decommissioned, no conflict
}
- if (balanceType == balanceType.TABLE) {
- // check is conflict with partition balance
- long maxBeSize =
beToTabletsInParts.get(cloudReplica.getPartitionId())
- .get(cloudReplica.getIndexId()).get(srcBe).size();
- List<Tablet> destBeTablets =
beToTabletsInParts.get(cloudReplica.getPartitionId())
- .get(cloudReplica.getIndexId()).get(destBe);
- long minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
- return minBeSize >= maxBeSize;
+ if (balanceType == BalanceType.GLOBAL) {
+ return checkGlobalBalanceConflict(srcBe, destBe, cloudReplica,
beToTabletsInParts, beToTabletsInTables);
+ } else if (balanceType == BalanceType.TABLE) {
+ return checkTableBalanceConflict(srcBe, destBe, cloudReplica,
beToTabletsInParts);
}
return false;
}
- private void balanceImpl(List<Long> bes, String clusterId, Map<Long,
List<Tablet>> beToTablets,
+ private boolean checkGlobalBalanceConflict(long srcBe, long destBe,
CloudReplica cloudReplica,
+ Map<Long, Map<Long, Map<Long,
Set<Tablet>>>> beToTabletsInParts,
+ Map<Long, Map<Long,
Set<Tablet>>> beToTabletsInTables) {
+ long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica,
beToTabletsInParts);
+ long minBeSize = getTabletSizeInParts(destBe, cloudReplica,
beToTabletsInParts);
+
+ if (minBeSize >= maxBeSize) {
+ return true; // Conflict detected
+ }
+
+ maxBeSize = getTabletSizeInBes(srcBe, cloudReplica,
beToTabletsInTables);
+ minBeSize = getTabletSizeInBes(destBe, cloudReplica,
beToTabletsInTables);
+
+ return minBeSize >= maxBeSize; // Conflict detected
+ }
+
+ private boolean checkTableBalanceConflict(long srcBe, long destBe,
CloudReplica cloudReplica,
+ Map<Long, Map<Long, Map<Long,
Set<Tablet>>>> beToTabletsInParts) {
+ long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica,
beToTabletsInParts);
+ long minBeSize = getTabletSizeInParts(destBe, cloudReplica,
beToTabletsInParts);
+
+ return minBeSize >= maxBeSize; // Conflict detected
+ }
+
+ private long getTabletSizeInParts(long beId, CloudReplica cloudReplica,
+ Map<Long, Map<Long, Map<Long,
Set<Tablet>>>> beToTabletsInParts) {
+ Set<Tablet> tablets =
beToTabletsInParts.get(cloudReplica.getPartitionId())
+ .get(cloudReplica.getIndexId()).get(beId);
+ return tablets == null ? 0 : tablets.size();
+ }
+
+ private long getTabletSizeInBes(long beId, CloudReplica cloudReplica,
+ Map<Long, Map<Long, Set<Tablet>>>
beToTabletsInTables) {
+ Set<Tablet> tablets =
beToTabletsInTables.get(cloudReplica.getTableId()).get(beId);
+ return tablets == null ? 0 : tablets.size();
+ }
+
+
+ private void balanceImpl(List<Long> bes, String clusterId, Map<Long,
Set<Tablet>> beToTablets,
BalanceType balanceType, List<UpdateCloudReplicaInfo> infos) {
if (bes == null || bes.isEmpty() || beToTablets == null ||
beToTablets.isEmpty()) {
return;
}
- long totalTabletsNum = 0;
- long beNum = 0;
- for (Long be : bes) {
- long tabletNum = beToTablets.get(be) == null ? 0 :
beToTablets.get(be).size();
- Backend backend = cloudSystemInfoService.getBackend(be);
- if (backend != null && !backend.isDecommissioning()) {
- beNum++;
- }
- totalTabletsNum += tabletNum;
- }
+ long totalTabletsNum = calculateTotalTablets(bes, beToTablets);
+ long beNum = countActiveBackends(bes);
+
if (beNum == 0) {
LOG.warn("zero be, but want balance, skip");
return;
}
+
long avgNum = totalTabletsNum / beNum;
- long transferNum = Math.max(Math.round(avgNum *
Config.cloud_balance_tablet_percent_per_run),
-
Config.cloud_min_balance_tablet_num_per_run);
+ long transferNum = calculateTransferNum(avgNum);
for (int i = 0; i < transferNum; i++) {
TransferPairInfo pairInfo = new TransferPairInfo();
if (!getTransferPair(bes, beToTablets, avgNum, pairInfo)) {
- // no need balance;
- break;
- }
-
- if (balanceType == balanceType.PARTITION) {
- indexBalanced = false;
+ break; // no need balance
}
- if (balanceType == balanceType.TABLE) {
- tableBalanced = false;
- }
+ updateBalanceStatus(balanceType);
long srcBe = pairInfo.srcBe;
long destBe = pairInfo.destBe;
- long minTabletsNum = pairInfo.minTabletsNum;
- long maxTabletsNum = pairInfo.maxTabletsNum;
- int randomIndex = rand.nextInt(beToTablets.get(srcBe).size());
- Tablet pickedTablet = beToTablets.get(srcBe).get(randomIndex);
- CloudReplica cloudReplica = (CloudReplica)
pickedTablet.getReplicas().get(0);
+ Tablet pickedTablet = pickRandomTablet(beToTablets.get(srcBe));
+ if (pickedTablet == null) {
+ continue; // No tablet to pick
+ }
+ CloudReplica cloudReplica = (CloudReplica)
pickedTablet.getReplicas().get(0);
Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe);
- // if srcBe is dead, destBe cann't download cache from it,
preheating will failed
+
if (Config.enable_cloud_warm_up_for_rebalance && srcBackend !=
null && srcBackend.isAlive()) {
- if (isConflict(srcBe, destBe, cloudReplica, balanceType,
futurePartitionToTablets,
- futureBeToTabletsInTable)) {
+ if (isConflict(srcBe, destBe, cloudReplica, balanceType,
+ futurePartitionToTablets, futureBeToTabletsInTable)) {
continue;
}
-
- try {
- sendPreHeatingRpc(pickedTablet, srcBe, destBe);
- } catch (Exception e) {
- break;
- }
-
- InfightTask task = new InfightTask();
- task.pickedTablet = pickedTablet;
- task.srcBe = srcBe;
- task.destBe = destBe;
- task.balanceType = balanceType;
- task.beToTablets = beToTablets;
- task.startTimestamp = System.currentTimeMillis() / 1000;
- tabletToInfightTask.put(new
InfightTablet(pickedTablet.getId(), clusterId), task);
-
- LOG.info("pre cache {} from {} to {}, cluster {} minNum {}
maxNum {} beNum {} tabletsNum {}, part {}",
- pickedTablet.getId(), srcBe, destBe, clusterId,
- minTabletsNum, maxTabletsNum, beNum, totalTabletsNum,
cloudReplica.getPartitionId());
- updateBeToTablets(pickedTablet, srcBe, destBe, balanceType,
- futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
+ preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId,
balanceType, beToTablets);
} else {
if (isConflict(srcBe, destBe, cloudReplica, balanceType,
partitionToTablets, beToTabletsInTable)) {
continue;
}
+ transferTablet(pickedTablet, srcBe, destBe, clusterId,
balanceType, infos);
+ }
+ }
+ }
- LOG.info("transfer {} from {} to {}, cluster {} minNum {}
maxNum {} beNum {} tabletsNum {}, part {}",
- pickedTablet.getId(), srcBe, destBe, clusterId,
- minTabletsNum, maxTabletsNum, beNum, totalTabletsNum,
cloudReplica.getPartitionId());
+ private long calculateTotalTablets(List<Long> bes, Map<Long, Set<Tablet>>
beToTablets) {
+ return bes.stream()
+ .mapToLong(be -> beToTablets.getOrDefault(be,
Collections.emptySet()).size())
+ .sum();
+ }
- updateBeToTablets(pickedTablet, srcBe, destBe, balanceType,
beToTabletsGlobal,
- beToTabletsInTable, partitionToTablets);
- updateBeToTablets(pickedTablet, srcBe, destBe, balanceType,
- futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
- updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
- }
+ private long countActiveBackends(List<Long> bes) {
+ return bes.stream()
+ .filter(be -> {
+ Backend backend = cloudSystemInfoService.getBackend(be);
+ return backend != null && !backend.isDecommissioning();
+ })
+ .count();
+ }
+
+ private long calculateTransferNum(long avgNum) {
+ return Math.max(Math.round(avgNum *
Config.cloud_balance_tablet_percent_per_run),
+ Config.cloud_min_balance_tablet_num_per_run);
+ }
+
+ private void updateBalanceStatus(BalanceType balanceType) {
+ if (balanceType == BalanceType.PARTITION) {
+ indexBalanced = false;
+ } else if (balanceType == BalanceType.TABLE) {
+ tableBalanced = false;
+ }
+ }
+
+ private Tablet pickRandomTablet(Set<Tablet> tablets) {
+ if (tablets.isEmpty()) {
+ return null;
+ }
+ int randomIndex = rand.nextInt(tablets.size());
+ return tablets.stream().skip(randomIndex).findFirst().orElse(null);
+ }
+
+ private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long
destBe, String clusterId,
+ BalanceType balanceType, Map<Long,
Set<Tablet>> beToTablets) {
+ try {
+ sendPreHeatingRpc(pickedTablet, srcBe, destBe);
+ } catch (Exception e) {
+ LOG.warn("Failed to preheat tablet {} from {} to {}, "
+ + "help msg turn off fe config
enable_cloud_warm_up_for_rebalance",
+ pickedTablet.getId(), srcBe, destBe, e);
+ return;
}
+
+ InfightTask task = new InfightTask();
+ task.pickedTablet = pickedTablet;
+ task.srcBe = srcBe;
+ task.destBe = destBe;
+ task.balanceType = balanceType;
+ task.beToTablets = beToTablets;
+ task.startTimestamp = System.currentTimeMillis() / 1000;
+ tabletToInfightTask.put(new InfightTablet(pickedTablet.getId(),
clusterId), task);
+
+ LOG.info("pre cache {} from {} to {}, cluster {}",
pickedTablet.getId(), srcBe, destBe, clusterId);
+ updateBeToTablets(pickedTablet, srcBe, destBe,
+ futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
+ }
+
+ private void transferTablet(Tablet pickedTablet, long srcBe, long destBe,
String clusterId,
+ BalanceType balanceType,
List<UpdateCloudReplicaInfo> infos) {
+ LOG.info("transfer {} from {} to {}, cluster {}",
pickedTablet.getId(), srcBe, destBe, clusterId);
+ updateBeToTablets(pickedTablet, srcBe, destBe,
+ beToTabletsGlobal, beToTabletsInTable, partitionToTablets);
+ updateBeToTablets(pickedTablet, srcBe, destBe,
+ futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
+ updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
}
public void addTabletMigrationTask(Long srcBe, Long dstBe) {
@@ -1088,7 +1156,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
*/
private void migrateTablets(Long srcBe, Long dstBe) {
// get tablets
- List<Tablet> tablets = beToTabletsGlobal.get(srcBe);
+ Set<Tablet> tablets = beToTabletsGlobal.get(srcBe);
if (tablets == null || tablets.isEmpty()) {
LOG.info("smooth upgrade srcBe={} does not have any tablets, set
inactive", srcBe);
((CloudEnv)
Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
@@ -1224,6 +1292,4 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
return rets;
}
-
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]