This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 32f52517449 [fix](cloud) Periodically cleaning secondary be in cloud used by redundant tablets (#50200) 32f52517449 is described below commit 32f5251744919b1514d000ebf31de067a9a9b06e Author: deardeng <deng...@selectdb.com> AuthorDate: Wed May 14 21:22:57 2025 +0800 [fix](cloud) Periodically cleaning secondary be in cloud used by redundant tablets (#50200) secondary BE in cloud mode, temporarily stores the be tablet mapping relationship of the be abnormal rehash. If it is not cleaned up, report the redundant tablet cleaning diff logic, which does not work as expected. --- .../apache/doris/cloud/catalog/CloudReplica.java | 44 +++++++++-- .../doris/cloud/catalog/CloudTabletRebalancer.java | 2 + .../test_clean_tablet_when_rebalance.groovy | 89 +++++++++++++++++----- 3 files changed, 108 insertions(+), 27 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index f27b9fdf0cc..358fc1023b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -26,6 +26,7 @@ import org.apache.doris.cloud.qe.ComputeGroupException; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.qe.ConnectContext; @@ -71,7 +72,8 @@ public class CloudReplica extends Replica { private Map<String, List<Long>> memClusterToBackends = new ConcurrentHashMap<String, List<Long>>(); // clusterId, secondaryBe, changeTimestamp - private Map<String, List<Long>> secondaryClusterToBackends = new ConcurrentHashMap<String, List<Long>>(); + private Map<String, Pair<Long, Long>> secondaryClusterToBackends + = new ConcurrentHashMap<String, Pair<Long, Long>>(); public CloudReplica() { } @@ -369,13 +371,17 @@ public class CloudReplica extends Replica { } public Backend getSecondaryBackend(String clusterId) { - List<Long> backendIds = secondaryClusterToBackends.get(clusterId); - if (backendIds == null || backendIds.isEmpty()) { + Pair<Long, Long> secondBeAndChangeTimestamp = secondaryClusterToBackends.get(clusterId); + if (secondBeAndChangeTimestamp == null) { return null; } - - long backendId = backendIds.get(0); - return Env.getCurrentSystemInfo().getBackend(backendId); + long beId = secondBeAndChangeTimestamp.key(); + long changeTimestamp = secondBeAndChangeTimestamp.value(); + if (LOG.isDebugEnabled()) { + LOG.debug("in secondaryClusterToBackends clusterId {}, beId {}, changeTimestamp {}, replica info {}", + clusterId, beId, changeTimestamp, this); + } + return Env.getCurrentSystemInfo().getBackend(secondBeAndChangeTimestamp.first); } public long hashReplicaToBe(String clusterId, boolean isBackGround) throws ComputeGroupException { @@ -583,11 +589,35 @@ public class CloudReplica extends Replica { } private void updateClusterToSecondaryBe(String cluster, long beId) { - secondaryClusterToBackends.put(cluster, Lists.newArrayList(beId)); + long changeTimestamp = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { + LOG.debug("add to secondary clusterId {}, beId {}, changeTimestamp {}, replica info {}", + cluster, beId, changeTimestamp, this); + } + secondaryClusterToBackends.put(cluster, Pair.of(beId, changeTimestamp)); } public void clearClusterToBe(String cluster) { primaryClusterToBackends.remove(cluster); secondaryClusterToBackends.remove(cluster); } + + // ATTN: This func is only used by redundant tablet report clean in bes. + // Only the master node will do the diff logic, + // so just only need to clean up secondaryClusterToBackends on the master node. + public void checkAndClearSecondaryClusterToBe(String clusterId, long expireTimestamp) { + Pair<Long, Long> secondBeAndChangeTimestamp = secondaryClusterToBackends.get(clusterId); + if (secondBeAndChangeTimestamp == null) { + return; + } + long beId = secondBeAndChangeTimestamp.key(); + long changeTimestamp = secondBeAndChangeTimestamp.value(); + + if (changeTimestamp < expireTimestamp) { + LOG.debug("remove clusterId {} secondary beId {} changeTimestamp {} expireTimestamp {} replica info {}", + clusterId, beId, changeTimestamp, expireTimestamp, this); + secondaryClusterToBackends.remove(clusterId); + return; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 6eb5567f7e8..b93d4fe2cff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -548,6 +548,8 @@ public class CloudTabletRebalancer extends MasterDaemon { for (Tablet tablet : index.getTablets()) { for (Replica r : tablet.getReplicas()) { CloudReplica replica = (CloudReplica) r; + // clean secondary map + replica.checkAndClearSecondaryClusterToBe(cluster, needRehashDeadTime); InfightTablet taskKey = new InfightTablet(tablet.getId(), cluster); // colocate table no need to update primary backends if (isColocated) { diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy index 151de976a83..caac5b73cfd 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy @@ -44,15 +44,36 @@ suite('test_clean_tablet_when_rebalance', 'docker') { def choseDeadBeIndex = 1 def table = "test_clean_tablet_when_rebalance" - def testCase = { deadTime, mergedCacheDir -> - boolean beDeadLong = deadTime > rehashTime ? true : false - logger.info("begin exec beDeadLong {}", beDeadLong) - + def selectTriggerRehash = { -> for (int i = 0; i < 5; i++) { sql """ select count(*) from $table """ } + } + + def getTabletInHostFromBe = { bes -> + def ret = [:] + bes.each { be -> + // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} + def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data + def tablets = data.tablets.collect { it.tablet_id as String } + tablets.each { + if (ret[it] != null) { + ret[it].add(data.host) + } else { + ret[it] = [data.host] + } + } + } + ret + } + + def testCase = { deadTime, mergedCacheDir -> + boolean beDeadLong = deadTime > rehashTime ? true : false + logger.info("begin exec beDeadLong {}", beDeadLong) + + selectTriggerRehash.call() def beforeGetFromFe = getTabletAndBeHostFromFe(table) def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) @@ -64,38 +85,65 @@ suite('test_clean_tablet_when_rebalance', 'docker') { cluster.stopBackends(choseDeadBeIndex) dockerAwaitUntil(50) { - def bes = sql_return_maparray("SHOW TABLETS FROM ${table}") - .collect { it.BackendId } - .unique() - logger.info("bes {}", bes) + def showTablets = sql_return_maparray("SHOW TABLETS FROM ${table}") + def bes = showTablets + .collect { it.BackendId } + .unique() + logger.info("before start bes {}, tablets {}", bes, showTablets) bes.size() == 2 } + // rehash + selectTriggerRehash.call() + // curl be, tablets in 2 bes if (beDeadLong) { setFeConfig('enable_cloud_partition_balance', false) setFeConfig('enable_cloud_table_balance', false) setFeConfig('enable_cloud_global_balance', false) } - sleep(deadTime * 1000) + // wait report logic + sleep(deadTime * 1000) cluster.startBackends(choseDeadBeIndex) + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + logger.info("after stop one be, rehash fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe) dockerAwaitUntil(50) { - def bes = sql_return_maparray("SHOW TABLETS FROM ${table}") + def showTablets = sql_return_maparray("SHOW TABLETS FROM ${table}") + def bes = showTablets .collect { it.BackendId } .unique() - logger.info("bes {}", bes) + logger.info("after start bes {}, tablets {}", bes, showTablets) bes.size() == (beDeadLong ? 2 : 3) } - for (int i = 0; i < 5; i++) { - sleep(2000) - sql """ - select count(*) from $table - """ + + selectTriggerRehash.call() + // wait report logic + // tablet report clean not work, before sleep, in fe secondary not been clear + afterGetFromFe = getTabletAndBeHostFromFe(table) + afterGetFromBe = getTabletInHostFromBe(cluster.getAllBackends()) + logger.info("before sleep rehash time, fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe) + def redundancyTablet = null + afterGetFromFe.each { + assertTrue(afterGetFromBe.containsKey(it.Key)) + if (afterGetFromBe[it.Key].size() == 2) { + redundancyTablet = it.Key + logger.info("find tablet {} redundancy in {}", it.Key, afterGetFromBe[it.Key]) + } + assertTrue(afterGetFromBe[it.Key].contains(it.Value[1])) } - def afterGetFromFe = getTabletAndBeHostFromFe(table) - def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) - logger.info("after fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe) + + sleep(rehashTime * 1000 + 10 * 1000) + // tablet report clean will work, after sleep, in fe secondary been clear + + afterGetFromFe = getTabletAndBeHostFromFe(table) + afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + if (!beDeadLong) { + def checkAfterGetFromBe = getTabletInHostFromBe(cluster.getAllBackends()) + assertEquals(1, checkAfterGetFromBe[redundancyTablet].size()) + } + logger.info("after sleep rehash time, fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe) afterGetFromFe.each { assertTrue(afterGetFromBe.containsKey(it.Key)) assertEquals(afterGetFromBe[it.Key], it.Value[1]) @@ -163,7 +211,8 @@ suite('test_clean_tablet_when_rebalance', 'docker') { ); """ sql """ - insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3') + insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3'), + (4, 4,'v4'), (5, 5, 'v5'), (6, 6, 'v6'), (100, 100, 'v100'), (7, 7, 'v7') """ def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) // 'rehash_tablet_after_be_dead_seconds=100' --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org