This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8364e8a899c [fix](cloud) fix active-priority rebalancer starvation
(#60835)
8364e8a899c is described below
commit 8364e8a899c356057a94e7c5b9aa3cf9474928e2
Author: deardeng <[email protected]>
AuthorDate: Mon Mar 2 14:16:48 2026 +0800
[fix](cloud) fix active-priority rebalancer starvation (#60835)
Root cause:
- In active-priority mode, performBalancing() returns early when active
objects are still unbalanced. Under sustained hot-tablet pressure, this
keeps happening and INACTIVE_ONLY never runs. Inactive partitions/tables
are starved, so BE scale-out rebalancing becomes very slow.
- runAfterCatalogReady() always called statRouteInfo() twice and rebuilt
active tablet ids every round. These full-route/stat rebuilds allocate
many temporary maps/sets, increasing young GC frequency.
Fix:
- Run the second statRouteInfo() only when smooth-upgrade migration
actually moved tablets.
- Add a starvation breaker: after N consecutive active-unbalanced
rounds, force one INACTIVE_ONLY round.
- Refresh active tablet snapshot by configurable interval (default 60s)
instead of every round.
Config:
- cloud_active_tablet_ids_refresh_interval_second (default 60)
- cloud_active_unbalanced_force_inactive_after_rounds (default 10)
Test:
- Add unit tests for forced inactive trigger, active snapshot refresh
interval, and migrate no-op path.
---
.../main/java/org/apache/doris/common/Config.java | 15 +++++
.../doris/cloud/catalog/CloudTabletRebalancer.java | 72 +++++++++++++++++-----
.../cloud/catalog/CloudTabletRebalancerTest.java | 72 +++++++++++++++++++++-
3 files changed, 142 insertions(+), 17 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 45735468dac..c43b89c5184 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3497,6 +3497,21 @@ public class Config extends ConfigBase {
+ "Default 10000. <=0 disables TopN segmentation."})
public static int cloud_active_partition_scheduling_topn = 10000;
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "活跃 tablet 优先调度开启时,active 集合刷新间隔(秒)。默认 60 秒,"
+ + "表示 60 秒内复用同一批 active tablet,避免每轮重算。",
+ "Refresh interval in seconds for the active-tablet snapshot when
active priority scheduling is enabled. "
+ + "Default 60 seconds. Reuses the same active-tablet set
within the interval."})
+ public static long cloud_active_tablet_ids_refresh_interval_second = 60L;
+
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "活跃 tablet 优先调度开启时,若 active 阶段连续 N 轮未达均衡,"
+ + "则强制执行一轮 inactive 阶段以避免长期饥饿。默认 10,<=0 表示关闭该强制机制。",
+ "When active priority scheduling is enabled and the active phase
remains unbalanced for N consecutive "
+ + "rounds, force one inactive phase round to avoid
long-term starvation. "
+ + "Default 10. <=0 disables this forced mechanism."})
+ public static int cloud_active_unbalanced_force_inactive_after_rounds = 10;
+
@ConfField(mutable = true, masterOnly = false)
public static String security_checker_class_name = "";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 8b1916eeb6a..6ecac0b89f0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -149,6 +149,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
private BalanceTypeEnum globalBalanceTypeEnum =
BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
private Set<Long> activeTabletIds = new HashSet<>();
+ private long lastActiveTabletIdsRefreshMs = 0L;
+ private int consecutiveActiveUnbalancedRounds = 0;
// cache for scheduling order in one daemon run (rebuilt in statRouteInfo)
// table/partition active count is computed from activeTabletIds
@@ -510,7 +512,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
LOG.info("cloud tablet rebalance begin");
long start = System.currentTimeMillis();
- activeTabletIds = getActiveTabletIds();
+ refreshActiveTabletIdsIfNeeded();
globalBalanceTypeEnum =
BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
buildClusterToBackendMap();
@@ -519,8 +521,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
statRouteInfo();
- migrateTabletsForSmoothUpgrade();
- statRouteInfo();
+ boolean migrated = migrateTabletsForSmoothUpgrade();
+ if (migrated) {
+ statRouteInfo();
+ }
indexBalanced = true;
tableBalanced = true;
@@ -564,18 +568,15 @@ public class CloudTabletRebalancer extends MasterDaemon {
LOG.info("cluster to backends {}", clusterToBes);
}
- private void migrateTabletsForSmoothUpgrade() {
+ private boolean migrateTabletsForSmoothUpgrade() {
+ boolean migrated = false;
Pair<Long, Long> pair;
- while (!tabletsMigrateTasks.isEmpty()) {
- try {
- pair = tabletsMigrateTasks.take();
- LOG.debug("begin tablets migration from be {} to be {}",
pair.first, pair.second);
- migrateTablets(pair.first, pair.second);
- } catch (InterruptedException e) {
- LOG.warn("migrate tablets failed", e);
- throw new RuntimeException(e);
- }
+ while ((pair = tabletsMigrateTasks.poll()) != null) {
+ LOG.debug("begin tablets migration from be {} to be {}",
pair.first, pair.second);
+ migrateTablets(pair.first, pair.second);
+ migrated = true;
}
+ return migrated;
}
private void performBalancing() {
@@ -585,6 +586,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
// to the same table or partition onto the same BE, while `partition`
scheduling later requires these tablets
// to be dispersed across different BEs, resulting in unnecessary
scheduling.
if (!Config.enable_cloud_active_tablet_priority_scheduling) {
+ consecutiveActiveUnbalancedRounds = 0;
// Legacy scheduling: schedule the full set.
if (Config.enable_cloud_partition_balance) {
balanceAllPartitionsByPhase(ActiveSchedulePhase.ALL);
@@ -621,10 +623,15 @@ public class CloudTabletRebalancer extends MasterDaemon {
activeIndexBalanced, activeTableBalanced,
activeBalanced, clusterToBes.size());
}
- if (!activeBalanced) {
+ boolean forceInactivePhase =
shouldForceInactivePhase(activeBalanced);
+ if (!activeBalanced && !forceInactivePhase) {
// Active objects are not balanced yet; skip phase2 to avoid
diluting scheduling budget.
return;
}
+ if (forceInactivePhase) {
+ LOG.info("active phase is still unbalanced for {} consecutive
rounds, force run INACTIVE_ONLY once",
+
Config.cloud_active_unbalanced_force_inactive_after_rounds);
+ }
// Phase 2: inactive (all - active), then global if enabled and
ready.
boolean phase2IndexBalanced = true;
@@ -635,12 +642,30 @@ public class CloudTabletRebalancer extends MasterDaemon {
if (Config.enable_cloud_table_balance && phase2IndexBalanced) {
phase2TableBalanced =
balanceAllTablesByPhase(ActiveSchedulePhase.INACTIVE_ONLY);
}
- if (Config.enable_cloud_global_balance && phase2IndexBalanced &&
phase2TableBalanced) {
+ if (Config.enable_cloud_global_balance && activeBalanced &&
phase2IndexBalanced && phase2TableBalanced) {
globalBalance();
}
}
}
+ private boolean shouldForceInactivePhase(boolean activeBalanced) {
+ if (activeBalanced) {
+ consecutiveActiveUnbalancedRounds = 0;
+ return false;
+ }
+ int forceAfterRounds =
Config.cloud_active_unbalanced_force_inactive_after_rounds;
+ if (forceAfterRounds <= 0) {
+ consecutiveActiveUnbalancedRounds = 0;
+ return false;
+ }
+ consecutiveActiveUnbalancedRounds++;
+ if (consecutiveActiveUnbalancedRounds < forceAfterRounds) {
+ return false;
+ }
+ consecutiveActiveUnbalancedRounds = 0;
+ return true;
+ }
+
private boolean balanceAllPartitionsByPhase(ActiveSchedulePhase phase) {
// Reuse existing "balanced" flags as a per-phase signal.
indexBalanced = true;
@@ -1682,7 +1707,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
maxTabletsNum = tabletNum;
}
} else {
- LOG.info("backend {} not found", be);
+ LOG.debug("backend {} not found", be);
}
}
return srcBe;
@@ -1939,6 +1964,21 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
}
+ private void refreshActiveTabletIdsIfNeeded() {
+ long nowMs = System.currentTimeMillis();
+ if (!shouldRefreshActiveTabletIds(nowMs)) {
+ return;
+ }
+ activeTabletIds = getActiveTabletIds();
+ lastActiveTabletIdsRefreshMs = nowMs;
+ }
+
+ private boolean shouldRefreshActiveTabletIds(long nowMs) {
+ long refreshIntervalSeconds = Math.max(1L,
Config.cloud_active_tablet_ids_refresh_interval_second);
+ long refreshIntervalMs =
TimeUnit.SECONDS.toMillis(refreshIntervalSeconds);
+ return lastActiveTabletIdsRefreshMs <= 0L || nowMs -
lastActiveTabletIdsRefreshMs >= refreshIntervalMs;
+ }
+
// Choose non-active (cold) tablet first to re-balance, to reduce impact
on hot tablets.
// Fallback to active/random if no cold tablet is available.
private Tablet pickTabletPreferCold(long srcBe, Set<Tablet> tablets,
Set<Long> activeTabletIds,
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java
index e18bcabd184..e3710c4efd8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java
@@ -43,16 +43,22 @@ import java.util.concurrent.ConcurrentHashMap;
public class CloudTabletRebalancerTest {
private boolean oldEnableActiveScheduling;
+ private long oldActiveTabletIdsRefreshIntervalSecond;
+ private int oldForceInactiveAfterRounds;
@BeforeEach
public void setUp() {
oldEnableActiveScheduling =
Config.enable_cloud_active_tablet_priority_scheduling;
+ oldActiveTabletIdsRefreshIntervalSecond =
Config.cloud_active_tablet_ids_refresh_interval_second;
+ oldForceInactiveAfterRounds =
Config.cloud_active_unbalanced_force_inactive_after_rounds;
Config.enable_cloud_active_tablet_priority_scheduling = true;
}
@AfterEach
public void tearDown() {
Config.enable_cloud_active_tablet_priority_scheduling =
oldEnableActiveScheduling;
+ Config.cloud_active_tablet_ids_refresh_interval_second =
oldActiveTabletIdsRefreshIntervalSecond;
+ Config.cloud_active_unbalanced_force_inactive_after_rounds =
oldForceInactiveAfterRounds;
}
private static class TestRebalancer extends CloudTabletRebalancer {
@@ -79,6 +85,13 @@ public class CloudTabletRebalancerTest {
f.set(obj, value);
}
+ @SuppressWarnings("unchecked")
+ private static <T> T getField(Object obj, String name) throws Exception {
+ Field f = CloudTabletRebalancer.class.getDeclaredField(name);
+ f.setAccessible(true);
+ return (T) f.get(obj);
+ }
+
@SuppressWarnings("unchecked")
private static <T> T invokePrivate(Object obj, String method, Class<?>[]
types, Object[] args) throws Exception {
Method m = CloudTabletRebalancer.class.getDeclaredMethod(method,
types);
@@ -236,6 +249,63 @@ public class CloudTabletRebalancerTest {
Assertions.assertEquals(200L, list.get(1).getKey());
Assertions.assertEquals(100L, list.get(2).getKey(), "Internal db
partition should be scheduled last");
}
-}
+ @Test
+ public void
testShouldForceInactivePhase_afterConsecutiveUnbalancedRounds() throws
Exception {
+ TestRebalancer r = new TestRebalancer();
+ Config.cloud_active_unbalanced_force_inactive_after_rounds = 3;
+
+ boolean forceRound1 = invokePrivate(r, "shouldForceInactivePhase",
+ new Class<?>[] {boolean.class}, new Object[] {false});
+ boolean forceRound2 = invokePrivate(r, "shouldForceInactivePhase",
+ new Class<?>[] {boolean.class}, new Object[] {false});
+ boolean forceRound3 = invokePrivate(r, "shouldForceInactivePhase",
+ new Class<?>[] {boolean.class}, new Object[] {false});
+
+ Assertions.assertFalse(forceRound1);
+ Assertions.assertFalse(forceRound2);
+ Assertions.assertTrue(forceRound3);
+ Assertions.assertEquals(0, (int) getField(r,
"consecutiveActiveUnbalancedRounds"));
+
+ boolean forceAfterBalanced = invokePrivate(r,
"shouldForceInactivePhase",
+ new Class<?>[] {boolean.class}, new Object[] {true});
+ Assertions.assertFalse(forceAfterBalanced);
+ Assertions.assertEquals(0, (int) getField(r,
"consecutiveActiveUnbalancedRounds"));
+ }
+
+ @Test
+ public void testShouldRefreshActiveTabletIds_respectsIntervalAndClamp()
throws Exception {
+ TestRebalancer r = new TestRebalancer();
+
+ Config.cloud_active_tablet_ids_refresh_interval_second = 60L;
+ setField(r, "lastActiveTabletIdsRefreshMs", 0L);
+ boolean firstRound = invokePrivate(r, "shouldRefreshActiveTabletIds",
+ new Class<?>[] {long.class}, new Object[] {1000L});
+ Assertions.assertTrue(firstRound);
+
+ setField(r, "lastActiveTabletIdsRefreshMs", 1000L);
+ boolean beforeInterval = invokePrivate(r,
"shouldRefreshActiveTabletIds",
+ new Class<?>[] {long.class}, new Object[] {60000L});
+ boolean atInterval = invokePrivate(r, "shouldRefreshActiveTabletIds",
+ new Class<?>[] {long.class}, new Object[] {61000L});
+ Assertions.assertFalse(beforeInterval);
+ Assertions.assertTrue(atInterval);
+
+ Config.cloud_active_tablet_ids_refresh_interval_second = 0L; // clamp
to 1s
+ setField(r, "lastActiveTabletIdsRefreshMs", 1000L);
+ boolean beforeClampInterval = invokePrivate(r,
"shouldRefreshActiveTabletIds",
+ new Class<?>[] {long.class}, new Object[] {1500L});
+ boolean atClampInterval = invokePrivate(r,
"shouldRefreshActiveTabletIds",
+ new Class<?>[] {long.class}, new Object[] {2000L});
+ Assertions.assertFalse(beforeClampInterval);
+ Assertions.assertTrue(atClampInterval);
+ }
+
+ @Test
+ public void testMigrateTabletsForSmoothUpgrade_emptyQueueReturnsFalse()
throws Exception {
+ TestRebalancer r = new TestRebalancer();
+ boolean migrated = invokePrivate(r, "migrateTabletsForSmoothUpgrade",
new Class<?>[] {}, new Object[] {});
+ Assertions.assertFalse(migrated);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]