This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2f3df2d0ef3b1af73957c78ba8030bf1af7880e3
Author: deardeng <565620...@qq.com>
AuthorDate: Fri Sep 6 19:10:17 2024 +0800

    [enhance](cloud) Add a policy when be abnormal, tablet delay switch be 
(#40371)
---
 .../main/java/org/apache/doris/common/Config.java  |  3 +
 .../apache/doris/cloud/catalog/CloudReplica.java   | 79 +++++++++++++++-------
 .../doris/cloud/catalog/CloudTabletRebalancer.java | 21 +++---
 .../cloud/datasource/CloudInternalCatalog.java     |  4 +-
 .../cloud_p0/multi_cluster/test_rebalance.groovy   |  2 +-
 5 files changed, 71 insertions(+), 38 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index eb70a71a9c3..42affa36daf 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3014,6 +3014,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, description = {"存算分离模式下是否开启大事务提交,默认false"})
     public static boolean enable_cloud_txn_lazy_commit = false;
 
+    @ConfField(mutable = true, description = 
{"存算分离模式下,当tablet分布的be异常,是否立即映射tablet到新的be上,默认true"})
+    public static boolean enable_immediate_be_assign = true;
+
     // ATTN: DONOT add any config not related to cloud mode here
     // ATTN: DONOT add any config not related to cloud mode here
     // ATTN: DONOT add any config not related to cloud mode here
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index be0c510559e..43f7dcbc687 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -20,7 +20,6 @@ package org.apache.doris.cloud.catalog;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.Replica.ReplicaContext;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -49,7 +48,7 @@ public class CloudReplica extends Replica {
 
     // In the future, a replica may be mapped to multiple BEs in a cluster,
     // so this value is be list
-    private Map<String, List<Long>> clusterToBackends = new 
ConcurrentHashMap<String, List<Long>>();
+    private Map<String, List<Long>> primaryClusterToBackends = new 
ConcurrentHashMap<String, List<Long>>();
     @SerializedName(value = "dbId")
     private long dbId = -1;
     @SerializedName(value = "tableId")
@@ -65,6 +64,9 @@ public class CloudReplica extends Replica {
 
     private Map<String, List<Long>> memClusterToBackends = new 
ConcurrentHashMap<String, List<Long>>();
 
+    // clusterId, secondaryBe, changeTimestamp
+    private Map<String, List<Long>> secondaryClusterToBackends = new 
ConcurrentHashMap<String, List<Long>>();
+
     public CloudReplica() {
     }
 
@@ -186,8 +188,8 @@ public class CloudReplica extends Replica {
                 backendId = memClusterToBackends.get(clusterId).get(indexRand);
             }
 
-            if (!replicaEnough && !allowColdRead && 
clusterToBackends.containsKey(clusterId)) {
-                backendId = clusterToBackends.get(clusterId).get(0);
+            if (!replicaEnough && !allowColdRead && 
primaryClusterToBackends.containsKey(clusterId)) {
+                backendId = primaryClusterToBackends.get(clusterId).get(0);
             }
 
             if (backendId > 0) {
@@ -212,21 +214,47 @@ public class CloudReplica extends Replica {
             }
         }
 
-        if (clusterToBackends.containsKey(clusterId)) {
-            long backendId = clusterToBackends.get(clusterId).get(0);
-            Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
-            if (be != null && be.isQueryAvailable()) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("backendId={} ", backendId);
-                }
-                return backendId;
+        // use primaryClusterToBackends, if find be normal
+        long pickBeId = getAvaliableBeId(clusterId, primaryClusterToBackends);
+        if (pickBeId != -1) {
+            return pickBeId;
+        }
+
+        if (!Config.enable_immediate_be_assign) {
+            // use secondaryClusterToBackends, if find be normal
+            pickBeId = getAvaliableBeId(clusterId, secondaryClusterToBackends);
+            if (pickBeId != -1) {
+                return pickBeId;
             }
         }
-        if 
(DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.clusterToBackends")) {
-            LOG.info("Debug Point enable 
CloudReplica.getBackendIdImpl.clusterToBackends");
+
+        if 
(DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.primaryClusterToBackends"))
 {
+            LOG.info("Debug Point enable 
CloudReplica.getBackendIdImpl.primaryClusterToBackends");
+            return -1;
+        }
+
+        // be abnormal, rehash it. configure settings to different maps
+        pickBeId = hashReplicaToBe(clusterId, false);
+        updateClusterToBe(clusterId, pickBeId, 
Config.enable_immediate_be_assign);
+        return pickBeId;
+    }
+
+    private long getAvaliableBeId(String clusterId, Map<String, List<Long>> 
clusterToBackends) {
+        List<Long> backendIds = clusterToBackends.get(clusterId);
+        if (backendIds == null || backendIds.isEmpty()) {
             return -1;
         }
-        return hashReplicaToBe(clusterId, false);
+
+        long backendId = backendIds.get(0);
+        Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+        if (be != null && be.isQueryAvailable()) {
+            // be normal
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("backendId={} ", backendId);
+            }
+            return backendId;
+        }
+        return -1;
     }
 
     public long hashReplicaToBe(String clusterId, boolean isBackGround) {
@@ -270,14 +298,10 @@ public class CloudReplica extends Replica {
                 pickedBeId, getId(), partitionId, availableBes.size(), idx, 
index,
                 hashCode == null ? -1 : hashCode.asLong());
 
-        // save to clusterToBackends map
-        List<Long> bes = new ArrayList<Long>();
-        bes.add(pickedBeId);
-        clusterToBackends.put(clusterId, bes);
-
         return pickedBeId;
     }
 
+
     public List<Long> hashReplicaToBes(String clusterId, boolean isBackGround, 
int replicaNum) {
         // TODO(luwei) list should be sorted
         List<Backend> clusterBes = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
@@ -375,7 +399,7 @@ public class CloudReplica extends Replica {
             long beId = in.readLong();
             List<Long> bes = new ArrayList<Long>();
             bes.add(beId);
-            clusterToBackends.put(clusterId, bes);
+            primaryClusterToBackends.put(clusterId, bes);
         }
     }
 
@@ -399,14 +423,19 @@ public class CloudReplica extends Replica {
         return idx;
     }
 
-    public Map<String, List<Long>> getClusterToBackends() {
-        return clusterToBackends;
+    public Map<String, List<Long>> getprimaryClusterToBackends() {
+        return primaryClusterToBackends;
     }
 
-    public void updateClusterToBe(String cluster, long beId) {
+    // save to primaryClusterToBackends or secondaryClusterToBackends map
+    public void updateClusterToBe(String cluster, long beId, boolean 
isUpdatePrimary) {
         // write lock
         List<Long> bes = new ArrayList<Long>();
         bes.add(beId);
-        clusterToBackends.put(cluster, bes);
+        if (isUpdatePrimary) {
+            primaryClusterToBackends.put(cluster, bes);
+        } else {
+            secondaryClusterToBackends.put(cluster, bes);
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 6f4534c4b08..ae34c6b722f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -499,23 +499,24 @@ public class CloudTabletRebalancer extends MasterDaemon {
             List<Long> tabletIds = new ArrayList<Long>();
             for (Tablet tablet : index.getTablets()) {
                 for (Replica replica : tablet.getReplicas()) {
-                    Map<String, List<Long>> clusterToBackends =
-                            ((CloudReplica) replica).getClusterToBackends();
-                    if (!clusterToBackends.containsKey(cluster)) {
+                    Map<String, List<Long>> primaryClusterToBackends =
+                            ((CloudReplica) 
replica).getprimaryClusterToBackends();
+                    if (!primaryClusterToBackends.containsKey(cluster)) {
                         long beId = ((CloudReplica) 
replica).hashReplicaToBe(cluster, true);
+                        ((CloudReplica) replica).updateClusterToBe(cluster, 
beId, true);
                         if (beId <= 0) {
                             assignedErrNum++;
                             continue;
                         }
                         List<Long> bes = new ArrayList<Long>();
                         bes.add(beId);
-                        clusterToBackends.put(cluster, bes);
+                        primaryClusterToBackends.put(cluster, bes);
 
                         assigned = true;
                         beIds.add(beId);
                         tabletIds.add(tablet.getId());
                     } else {
-                        beIds.add(clusterToBackends.get(cluster).get(0));
+                        
beIds.add(primaryClusterToBackends.get(cluster).get(0));
                         tabletIds.add(tablet.getId());
                     }
                 }
@@ -569,9 +570,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
         loopCloudReplica((Database db, Table table, Partition partition, 
MaterializedIndex index, String cluster) -> {
             for (Tablet tablet : index.getTablets()) {
                 for (Replica replica : tablet.getReplicas()) {
-                    Map<String, List<Long>> clusterToBackends =
-                            ((CloudReplica) replica).getClusterToBackends();
-                    for (Map.Entry<String, List<Long>> entry : 
clusterToBackends.entrySet()) {
+                    Map<String, List<Long>> primaryClusterToBackends =
+                            ((CloudReplica) 
replica).getprimaryClusterToBackends();
+                    for (Map.Entry<String, List<Long>> entry : 
primaryClusterToBackends.entrySet()) {
                         if (!cluster.equals(entry.getKey())) {
                             continue;
                         }
@@ -729,7 +730,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
     private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String 
clusterId,
                                       List<UpdateCloudReplicaInfo> infos) {
         CloudReplica cloudReplica = (CloudReplica) 
pickedTablet.getReplicas().get(0);
-        cloudReplica.updateClusterToBe(clusterId, destBe);
+        cloudReplica.updateClusterToBe(clusterId, destBe, true);
         Database db = 
Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
         if (db == null) {
             return;
@@ -973,7 +974,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
             String clusterId = be.getCloudClusterId();
             String clusterName = be.getCloudClusterName();
             // update replica location info
-            cloudReplica.updateClusterToBe(clusterId, dstBe);
+            cloudReplica.updateClusterToBe(clusterId, dstBe, true);
             LOG.info("cloud be migrate tablet {} from srcBe={} to dstBe={}, 
clusterId={}, clusterName={}",
                     tablet.getId(), srcBe, dstBe, clusterId, clusterName);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index ef461c74b21..b7a932486ad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -975,7 +975,7 @@ public class CloudInternalCatalog extends InternalCatalog {
                     clusterId = realClusterId;
                 }
 
-                ((CloudReplica) replica).updateClusterToBe(clusterId, 
info.getBeId());
+                ((CloudReplica) replica).updateClusterToBe(clusterId, 
info.getBeId(), true);
 
                 LOG.debug("update single cloud replica cluster {} replica {} 
be {}", info.getClusterId(),
                         replica.getId(), info.getBeId());
@@ -1001,7 +1001,7 @@ public class CloudInternalCatalog extends InternalCatalog 
{
 
                     LOG.debug("update cloud replica cluster {} replica {} be 
{}", info.getClusterId(),
                             replica.getId(), info.getBeIds().get(i));
-                    ((CloudReplica) replica).updateClusterToBe(clusterId, 
info.getBeIds().get(i));
+                    ((CloudReplica) replica).updateClusterToBe(clusterId, 
info.getBeIds().get(i), true);
                 }
             }
         } catch (Exception e) {
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy 
b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
index e1735a4acd4..0aa2e83ccc2 100644
--- a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
@@ -63,7 +63,7 @@ suite('test_rebalance_in_cloud', 'multi_cluster') {
             PARTITION p1998 VALUES [("19980101"), ("19990101")))
             DISTRIBUTED BY HASH(k1) BUCKETS 3
         """
-        
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.clusterToBackends");
+        
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.primaryClusterToBackends");
         sql """set global forward_to_master=false"""
         
         // add a be


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to