This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2f3df2d0ef3b1af73957c78ba8030bf1af7880e3 Author: deardeng <565620...@qq.com> AuthorDate: Fri Sep 6 19:10:17 2024 +0800 [enhance](cloud) Add a policy when be abnormal, tablet delay switch be (#40371) --- .../main/java/org/apache/doris/common/Config.java | 3 + .../apache/doris/cloud/catalog/CloudReplica.java | 79 +++++++++++++++------- .../doris/cloud/catalog/CloudTabletRebalancer.java | 21 +++--- .../cloud/datasource/CloudInternalCatalog.java | 4 +- .../cloud_p0/multi_cluster/test_rebalance.groovy | 2 +- 5 files changed, 71 insertions(+), 38 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index eb70a71a9c3..42affa36daf 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3014,6 +3014,9 @@ public class Config extends ConfigBase { @ConfField(mutable = true, description = {"存算分离模式下是否开启大事务提交,默认false"}) public static boolean enable_cloud_txn_lazy_commit = false; + @ConfField(mutable = true, description = {"存算分离模式下,当tablet分布的be异常,是否立即映射tablet到新的be上,默认true"}) + public static boolean enable_immediate_be_assign = true; + // ATTN: DONOT add any config not related to cloud mode here // ATTN: DONOT add any config not related to cloud mode here // ATTN: DONOT add any config not related to cloud mode here diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index be0c510559e..43f7dcbc687 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -20,7 +20,6 @@ package org.apache.doris.cloud.catalog; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; -import org.apache.doris.catalog.Replica.ReplicaContext; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -49,7 +48,7 @@ public class CloudReplica extends Replica { // In the future, a replica may be mapped to multiple BEs in a cluster, // so this value is be list - private Map<String, List<Long>> clusterToBackends = new ConcurrentHashMap<String, List<Long>>(); + private Map<String, List<Long>> primaryClusterToBackends = new ConcurrentHashMap<String, List<Long>>(); @SerializedName(value = "dbId") private long dbId = -1; @SerializedName(value = "tableId") @@ -65,6 +64,9 @@ public class CloudReplica extends Replica { private Map<String, List<Long>> memClusterToBackends = new ConcurrentHashMap<String, List<Long>>(); + // clusterId, secondaryBe, changeTimestamp + private Map<String, List<Long>> secondaryClusterToBackends = new ConcurrentHashMap<String, List<Long>>(); + public CloudReplica() { } @@ -186,8 +188,8 @@ public class CloudReplica extends Replica { backendId = memClusterToBackends.get(clusterId).get(indexRand); } - if (!replicaEnough && !allowColdRead && clusterToBackends.containsKey(clusterId)) { - backendId = clusterToBackends.get(clusterId).get(0); + if (!replicaEnough && !allowColdRead && primaryClusterToBackends.containsKey(clusterId)) { + backendId = primaryClusterToBackends.get(clusterId).get(0); } if (backendId > 0) { @@ -212,21 +214,47 @@ public class CloudReplica extends Replica { } } - if (clusterToBackends.containsKey(clusterId)) { - long backendId = clusterToBackends.get(clusterId).get(0); - Backend be = Env.getCurrentSystemInfo().getBackend(backendId); - if (be != null && be.isQueryAvailable()) { - if (LOG.isDebugEnabled()) { - LOG.debug("backendId={} ", backendId); - } - return backendId; + // use primaryClusterToBackends, if find be normal + long pickBeId = getAvaliableBeId(clusterId, primaryClusterToBackends); + if (pickBeId != -1) { + return pickBeId; + } + + if (!Config.enable_immediate_be_assign) { + // use secondaryClusterToBackends, if find be normal + pickBeId = getAvaliableBeId(clusterId, secondaryClusterToBackends); + if (pickBeId != -1) { + return pickBeId; } } - if (DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.clusterToBackends")) { - LOG.info("Debug Point enable CloudReplica.getBackendIdImpl.clusterToBackends"); + + if (DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.primaryClusterToBackends")) { + LOG.info("Debug Point enable CloudReplica.getBackendIdImpl.primaryClusterToBackends"); + return -1; + } + + // be abnormal, rehash it. configure settings to different maps + pickBeId = hashReplicaToBe(clusterId, false); + updateClusterToBe(clusterId, pickBeId, Config.enable_immediate_be_assign); + return pickBeId; + } + + private long getAvaliableBeId(String clusterId, Map<String, List<Long>> clusterToBackends) { + List<Long> backendIds = clusterToBackends.get(clusterId); + if (backendIds == null || backendIds.isEmpty()) { return -1; } - return hashReplicaToBe(clusterId, false); + + long backendId = backendIds.get(0); + Backend be = Env.getCurrentSystemInfo().getBackend(backendId); + if (be != null && be.isQueryAvailable()) { + // be normal + if (LOG.isDebugEnabled()) { + LOG.debug("backendId={} ", backendId); + } + return backendId; + } + return -1; } public long hashReplicaToBe(String clusterId, boolean isBackGround) { @@ -270,14 +298,10 @@ public class CloudReplica extends Replica { pickedBeId, getId(), partitionId, availableBes.size(), idx, index, hashCode == null ? -1 : hashCode.asLong()); - // save to clusterToBackends map - List<Long> bes = new ArrayList<Long>(); - bes.add(pickedBeId); - clusterToBackends.put(clusterId, bes); - return pickedBeId; } + public List<Long> hashReplicaToBes(String clusterId, boolean isBackGround, int replicaNum) { // TODO(luwei) list should be sorted List<Backend> clusterBes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) @@ -375,7 +399,7 @@ public class CloudReplica extends Replica { long beId = in.readLong(); List<Long> bes = new ArrayList<Long>(); bes.add(beId); - clusterToBackends.put(clusterId, bes); + primaryClusterToBackends.put(clusterId, bes); } } @@ -399,14 +423,19 @@ public class CloudReplica extends Replica { return idx; } - public Map<String, List<Long>> getClusterToBackends() { - return clusterToBackends; + public Map<String, List<Long>> getprimaryClusterToBackends() { + return primaryClusterToBackends; } - public void updateClusterToBe(String cluster, long beId) { + // save to primaryClusterToBackends or secondaryClusterToBackends map + public void updateClusterToBe(String cluster, long beId, boolean isUpdatePrimary) { // write lock List<Long> bes = new ArrayList<Long>(); bes.add(beId); - clusterToBackends.put(cluster, bes); + if (isUpdatePrimary) { + primaryClusterToBackends.put(cluster, bes); + } else { + secondaryClusterToBackends.put(cluster, bes); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 6f4534c4b08..ae34c6b722f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -499,23 +499,24 @@ public class CloudTabletRebalancer extends MasterDaemon { List<Long> tabletIds = new ArrayList<Long>(); for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { - Map<String, List<Long>> clusterToBackends = - ((CloudReplica) replica).getClusterToBackends(); - if (!clusterToBackends.containsKey(cluster)) { + Map<String, List<Long>> primaryClusterToBackends = + ((CloudReplica) replica).getprimaryClusterToBackends(); + if (!primaryClusterToBackends.containsKey(cluster)) { long beId = ((CloudReplica) replica).hashReplicaToBe(cluster, true); + ((CloudReplica) replica).updateClusterToBe(cluster, beId, true); if (beId <= 0) { assignedErrNum++; continue; } List<Long> bes = new ArrayList<Long>(); bes.add(beId); - clusterToBackends.put(cluster, bes); + primaryClusterToBackends.put(cluster, bes); assigned = true; beIds.add(beId); tabletIds.add(tablet.getId()); } else { - beIds.add(clusterToBackends.get(cluster).get(0)); + beIds.add(primaryClusterToBackends.get(cluster).get(0)); tabletIds.add(tablet.getId()); } } @@ -569,9 +570,9 @@ public class CloudTabletRebalancer extends MasterDaemon { loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> { for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { - Map<String, List<Long>> clusterToBackends = - ((CloudReplica) replica).getClusterToBackends(); - for (Map.Entry<String, List<Long>> entry : clusterToBackends.entrySet()) { + Map<String, List<Long>> primaryClusterToBackends = + ((CloudReplica) replica).getprimaryClusterToBackends(); + for (Map.Entry<String, List<Long>> entry : primaryClusterToBackends.entrySet()) { if (!cluster.equals(entry.getKey())) { continue; } @@ -729,7 +730,7 @@ public class CloudTabletRebalancer extends MasterDaemon { private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId, List<UpdateCloudReplicaInfo> infos) { CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0); - cloudReplica.updateClusterToBe(clusterId, destBe); + cloudReplica.updateClusterToBe(clusterId, destBe, true); Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId()); if (db == null) { return; @@ -973,7 +974,7 @@ public class CloudTabletRebalancer extends MasterDaemon { String clusterId = be.getCloudClusterId(); String clusterName = be.getCloudClusterName(); // update replica location info - cloudReplica.updateClusterToBe(clusterId, dstBe); + cloudReplica.updateClusterToBe(clusterId, dstBe, true); LOG.info("cloud be migrate tablet {} from srcBe={} to dstBe={}, clusterId={}, clusterName={}", tablet.getId(), srcBe, dstBe, clusterId, clusterName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index ef461c74b21..b7a932486ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -975,7 +975,7 @@ public class CloudInternalCatalog extends InternalCatalog { clusterId = realClusterId; } - ((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeId()); + ((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeId(), true); LOG.debug("update single cloud replica cluster {} replica {} be {}", info.getClusterId(), replica.getId(), info.getBeId()); @@ -1001,7 +1001,7 @@ public class CloudInternalCatalog extends InternalCatalog { LOG.debug("update cloud replica cluster {} replica {} be {}", info.getClusterId(), replica.getId(), info.getBeIds().get(i)); - ((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeIds().get(i)); + ((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeIds().get(i), true); } } } catch (Exception e) { diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy index e1735a4acd4..0aa2e83ccc2 100644 --- a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy +++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy @@ -63,7 +63,7 @@ suite('test_rebalance_in_cloud', 'multi_cluster') { PARTITION p1998 VALUES [("19980101"), ("19990101"))) DISTRIBUTED BY HASH(k1) BUCKETS 3 """ - GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.clusterToBackends"); + GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.primaryClusterToBackends"); sql """set global forward_to_master=false""" // add a be --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org