This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 32f52517449 [fix](cloud) Periodically cleaning secondary be in cloud 
used by redundant tablets (#50200)
32f52517449 is described below

commit 32f5251744919b1514d000ebf31de067a9a9b06e
Author: deardeng <deng...@selectdb.com>
AuthorDate: Wed May 14 21:22:57 2025 +0800

    [fix](cloud) Periodically cleaning secondary be in cloud used by redundant 
tablets (#50200)
    
    secondary BE in cloud mode, temporarily stores the be tablet mapping
    relationship of the be abnormal rehash. If it is not cleaned up, report
    the redundant tablet cleaning diff logic, which does not work as
    expected.
---
 .../apache/doris/cloud/catalog/CloudReplica.java   | 44 +++++++++--
 .../doris/cloud/catalog/CloudTabletRebalancer.java |  2 +
 .../test_clean_tablet_when_rebalance.groovy        | 89 +++++++++++++++++-----
 3 files changed, 108 insertions(+), 27 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index f27b9fdf0cc..358fc1023b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -26,6 +26,7 @@ import org.apache.doris.cloud.qe.ComputeGroupException;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.qe.ConnectContext;
@@ -71,7 +72,8 @@ public class CloudReplica extends Replica {
     private Map<String, List<Long>> memClusterToBackends = new 
ConcurrentHashMap<String, List<Long>>();
 
     // clusterId, secondaryBe, changeTimestamp
-    private Map<String, List<Long>> secondaryClusterToBackends = new 
ConcurrentHashMap<String, List<Long>>();
+    private Map<String, Pair<Long, Long>> secondaryClusterToBackends
+            = new ConcurrentHashMap<String, Pair<Long, Long>>();
 
     public CloudReplica() {
     }
@@ -369,13 +371,17 @@ public class CloudReplica extends Replica {
     }
 
     public Backend getSecondaryBackend(String clusterId) {
-        List<Long> backendIds = secondaryClusterToBackends.get(clusterId);
-        if (backendIds == null || backendIds.isEmpty()) {
+        Pair<Long, Long> secondBeAndChangeTimestamp = 
secondaryClusterToBackends.get(clusterId);
+        if (secondBeAndChangeTimestamp == null) {
             return null;
         }
-
-        long backendId = backendIds.get(0);
-        return Env.getCurrentSystemInfo().getBackend(backendId);
+        long beId = secondBeAndChangeTimestamp.key();
+        long changeTimestamp = secondBeAndChangeTimestamp.value();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("in secondaryClusterToBackends clusterId {}, beId {}, 
changeTimestamp {}, replica info {}",
+                    clusterId, beId, changeTimestamp, this);
+        }
+        return 
Env.getCurrentSystemInfo().getBackend(secondBeAndChangeTimestamp.first);
     }
 
     public long hashReplicaToBe(String clusterId, boolean isBackGround) throws 
ComputeGroupException {
@@ -583,11 +589,35 @@ public class CloudReplica extends Replica {
     }
 
     private void updateClusterToSecondaryBe(String cluster, long beId) {
-        secondaryClusterToBackends.put(cluster, Lists.newArrayList(beId));
+        long changeTimestamp = System.currentTimeMillis();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("add to secondary clusterId {}, beId {}, changeTimestamp 
{}, replica info {}",
+                    cluster, beId, changeTimestamp, this);
+        }
+        secondaryClusterToBackends.put(cluster, Pair.of(beId, 
changeTimestamp));
     }
 
     public void clearClusterToBe(String cluster) {
         primaryClusterToBackends.remove(cluster);
         secondaryClusterToBackends.remove(cluster);
     }
+
+    // ATTN: This func is only used by redundant tablet report clean in bes.
+    // Only the master node will do the diff logic,
+    // so just only need to clean up secondaryClusterToBackends on the master 
node.
+    public void checkAndClearSecondaryClusterToBe(String clusterId, long 
expireTimestamp) {
+        Pair<Long, Long> secondBeAndChangeTimestamp = 
secondaryClusterToBackends.get(clusterId);
+        if (secondBeAndChangeTimestamp == null) {
+            return;
+        }
+        long beId = secondBeAndChangeTimestamp.key();
+        long changeTimestamp = secondBeAndChangeTimestamp.value();
+
+        if (changeTimestamp < expireTimestamp) {
+            LOG.debug("remove clusterId {} secondary beId {} changeTimestamp 
{} expireTimestamp {} replica info {}",
+                    clusterId, beId, changeTimestamp, expireTimestamp, this);
+            secondaryClusterToBackends.remove(clusterId);
+            return;
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 6eb5567f7e8..b93d4fe2cff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -548,6 +548,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
             for (Tablet tablet : index.getTablets()) {
                 for (Replica r : tablet.getReplicas()) {
                     CloudReplica replica = (CloudReplica) r;
+                    // clean secondary map
+                    replica.checkAndClearSecondaryClusterToBe(cluster, 
needRehashDeadTime);
                     InfightTablet taskKey = new InfightTablet(tablet.getId(), 
cluster);
                     // colocate table no need to update primary backends
                     if (isColocated) {
diff --git 
a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
 
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
index 151de976a83..caac5b73cfd 100644
--- 
a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
+++ 
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
@@ -44,15 +44,36 @@ suite('test_clean_tablet_when_rebalance', 'docker') {
     def choseDeadBeIndex = 1
     def table = "test_clean_tablet_when_rebalance"
 
-    def testCase = { deadTime, mergedCacheDir -> 
-        boolean beDeadLong = deadTime > rehashTime ? true : false
-        logger.info("begin exec beDeadLong {}", beDeadLong)
-
+    def selectTriggerRehash = { ->
         for (int i = 0; i < 5; i++) {
             sql """
                 select count(*) from $table
             """
         }
+    }
+
+    def getTabletInHostFromBe = { bes ->
+        def ret = [:]
+        bes.each { be ->
+            // 
{"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3}
+            def data = 
Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all";, true).data
+            def tablets = data.tablets.collect { it.tablet_id as String }
+            tablets.each {
+                if (ret[it] != null) {
+                    ret[it].add(data.host)
+                } else {
+                    ret[it] = [data.host]
+                }
+            }
+        }
+        ret
+    }
+
+    def testCase = { deadTime, mergedCacheDir -> 
+        boolean beDeadLong = deadTime > rehashTime ? true : false
+        logger.info("begin exec beDeadLong {}", beDeadLong)
+
+        selectTriggerRehash.call()
 
         def beforeGetFromFe = getTabletAndBeHostFromFe(table)
         def beforeGetFromBe = 
getTabletAndBeHostFromBe(cluster.getAllBackends())
@@ -64,38 +85,65 @@ suite('test_clean_tablet_when_rebalance', 'docker') {
 
         cluster.stopBackends(choseDeadBeIndex)
         dockerAwaitUntil(50) {
-            def bes = sql_return_maparray("SHOW TABLETS FROM ${table}")
-                    .collect { it.BackendId }
-                    .unique()
-            logger.info("bes {}", bes)
+            def showTablets = sql_return_maparray("SHOW TABLETS FROM ${table}")
+            def bes = showTablets
+                .collect { it.BackendId }
+                .unique()
+            logger.info("before start bes {}, tablets {}", bes, showTablets)
             bes.size() == 2
         }
+        // rehash
+        selectTriggerRehash.call()
+        // curl be, tablets in 2 bes
 
         if (beDeadLong) {
             setFeConfig('enable_cloud_partition_balance', false)
             setFeConfig('enable_cloud_table_balance', false)
             setFeConfig('enable_cloud_global_balance', false)
         }
-        sleep(deadTime * 1000)
 
+        // wait report logic
+        sleep(deadTime * 1000)
         cluster.startBackends(choseDeadBeIndex)
+        def afterGetFromFe = getTabletAndBeHostFromFe(table)
+        def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
+        logger.info("after stop one be, rehash fe tablets {}, be tablets {}", 
afterGetFromFe, afterGetFromBe)
 
         dockerAwaitUntil(50) {
-           def bes = sql_return_maparray("SHOW TABLETS FROM ${table}")
+            def showTablets = sql_return_maparray("SHOW TABLETS FROM ${table}")
+            def bes = showTablets
                 .collect { it.BackendId }
                 .unique()
-            logger.info("bes {}", bes)
+            logger.info("after start bes {}, tablets {}", bes, showTablets)
             bes.size() == (beDeadLong ? 2 : 3)
         }
-        for (int i = 0; i < 5; i++) {
-            sleep(2000)
-            sql """
-                select count(*) from $table
-            """
+
+        selectTriggerRehash.call()
+        // wait report logic
+        // tablet report clean not work, before sleep, in fe secondary not 
been clear
+        afterGetFromFe = getTabletAndBeHostFromFe(table)
+        afterGetFromBe = getTabletInHostFromBe(cluster.getAllBackends())
+        logger.info("before sleep rehash time, fe tablets {}, be tablets {}", 
afterGetFromFe, afterGetFromBe)
+        def redundancyTablet = null
+        afterGetFromFe.each {
+            assertTrue(afterGetFromBe.containsKey(it.Key))
+            if (afterGetFromBe[it.Key].size() == 2) {
+                redundancyTablet = it.Key
+                logger.info("find tablet {} redundancy in {}", it.Key, 
afterGetFromBe[it.Key])
+            }
+            assertTrue(afterGetFromBe[it.Key].contains(it.Value[1]))
         }
-        def afterGetFromFe = getTabletAndBeHostFromFe(table)
-        def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
-        logger.info("after fe tablets {}, be tablets {}", afterGetFromFe, 
afterGetFromBe)
+
+        sleep(rehashTime * 1000 + 10 * 1000)
+        // tablet report clean will work, after sleep, in fe secondary been 
clear
+
+        afterGetFromFe = getTabletAndBeHostFromFe(table)
+        afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
+        if (!beDeadLong) {
+            def checkAfterGetFromBe = 
getTabletInHostFromBe(cluster.getAllBackends())
+            assertEquals(1, checkAfterGetFromBe[redundancyTablet].size())
+        }
+        logger.info("after sleep rehash time, fe tablets {}, be tablets {}", 
afterGetFromFe, afterGetFromBe)
         afterGetFromFe.each {
             assertTrue(afterGetFromBe.containsKey(it.Key))
             assertEquals(afterGetFromBe[it.Key], it.Value[1])
@@ -163,7 +211,8 @@ suite('test_clean_tablet_when_rebalance', 'docker') {
             );
         """
         sql """
-            insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3')
+            insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 
'v3'), 
+                (4, 4,'v4'), (5, 5, 'v5'), (6, 6, 'v6'), (100, 100, 'v100'), 
(7, 7, 'v7')
         """
         def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 
2)
         // 'rehash_tablet_after_be_dead_seconds=100'


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to