This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7b4ecfb5728 [feature](merge-cloud) Fix cloud mode can't drop backends and refacto… (#30534) 7b4ecfb5728 is described below commit 7b4ecfb5728e27300354803ff531756e0af5d17c Author: deardeng <565620...@qq.com> AuthorDate: Wed Jan 31 19:53:54 2024 +0800 [feature](merge-cloud) Fix cloud mode can't drop backends and refacto… (#30534) --- .../main/java/org/apache/doris/catalog/Env.java | 24 +- .../doris/cloud/catalog/CloudClusterChecker.java | 33 +- .../org/apache/doris/cloud/catalog/CloudEnv.java | 23 +- .../cloud/catalog/CloudInstanceStatusChecker.java | 11 +- .../apache/doris/cloud/catalog/CloudReplica.java | 21 +- .../doris/cloud/load/CloudBrokerLoadJob.java | 9 +- .../doris/cloud/system/CloudSystemInfoService.java | 331 +++++++++++++++++---- .../java/org/apache/doris/qe/ConnectContext.java | 6 +- .../org/apache/doris/system/SystemInfoService.java | 177 ----------- 9 files changed, 336 insertions(+), 299 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 074c5cb7961..5dd1740aeaa 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -69,7 +69,6 @@ import org.apache.doris.analysis.RecoverDbStmt; import org.apache.doris.analysis.RecoverPartitionStmt; import org.apache.doris.analysis.RecoverTableStmt; import org.apache.doris.analysis.ReplacePartitionClause; -import org.apache.doris.analysis.ResourceTypeEnum; import org.apache.doris.analysis.RestoreStmt; import org.apache.doris.analysis.RollupRenameClause; import org.apache.doris.analysis.SetType; @@ -425,7 +424,7 @@ public class Env { private JournalObservable journalObservable; - private SystemInfoService systemInfo; + protected SystemInfoService systemInfo; private HeartbeatMgr heartbeatMgr; private TabletInvertedIndex tabletInvertedIndex; private ColocateTableIndex colocateTableIndex; @@ -4999,27 +4998,6 @@ public class Env { this.alter.getClusterHandler().cancel(stmt); } - public void checkCloudClusterPriv(String clusterName) throws DdlException { - // check resource usage privilege - if (!Env.getCurrentEnv().getAuth().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(), - clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) { - throw new DdlException("USAGE denied to user " - + ConnectContext.get().getQualifiedUser() + "'@'" + ConnectContext.get().getRemoteIP() - + "' for cloud cluster '" + clusterName + "'", ErrorCode.ERR_CLUSTER_NO_PERMISSIONS); - } - - if (!Env.getCurrentSystemInfo().getCloudClusterNames().contains(clusterName)) { - LOG.debug("current instance does not have a cluster name :{}", clusterName); - throw new DdlException(String.format("Cluster %s not exist", clusterName), - ErrorCode.ERR_CLOUD_CLUSTER_ERROR); - } - } - - public static void waitForAutoStart(final String clusterName) throws DdlException { - // TODO: merge from cloud. - throw new DdlException("Env.waitForAutoStart unimplemented"); - } - // Switch catalog of this sesseion. public void changeCatalog(ConnectContext ctx, String catalogName) throws DdlException { CatalogIf catalogIf = catalogMgr.getCatalogNullable(catalogName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java index 5288cd73ea7..50f29bdc4f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java @@ -52,8 +52,11 @@ import java.util.stream.Collectors; public class CloudClusterChecker extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(CloudClusterChecker.class); - public CloudClusterChecker() { + private CloudSystemInfoService cloudSystemInfoService; + + public CloudClusterChecker(CloudSystemInfoService cloudSystemInfoService) { super("cloud cluster check", Config.cloud_cluster_check_interval_second * 1000L); + this.cloudSystemInfoService = cloudSystemInfoService; } /** @@ -117,7 +120,7 @@ public class CloudClusterChecker extends MasterDaemon { b.setTagMap(newTagMap); toAdd.add(b); } - Env.getCurrentSystemInfo().updateCloudBackends(toAdd, new ArrayList<>()); + cloudSystemInfoService.updateCloudBackends(toAdd, new ArrayList<>()); } ); } @@ -133,15 +136,15 @@ public class CloudClusterChecker extends MasterDaemon { LOG.debug("begin to drop clusterId: {}", delId); List<Backend> toDel = new ArrayList<>(finalClusterIdToBackend.getOrDefault(delId, new ArrayList<>())); - Env.getCurrentSystemInfo().updateCloudBackends(new ArrayList<>(), toDel); + cloudSystemInfoService.updateCloudBackends(new ArrayList<>(), toDel); // del clusterName - String delClusterName = Env.getCurrentSystemInfo().getClusterNameByClusterId(delId); + String delClusterName = cloudSystemInfoService.getClusterNameByClusterId(delId); if (delClusterName.isEmpty()) { LOG.warn("can't get delClusterName, clusterId: {}, plz check", delId); return; } // del clusterID - Env.getCurrentSystemInfo().dropCluster(delId, delClusterName); + cloudSystemInfoService.dropCluster(delId, delClusterName); } ); } @@ -202,12 +205,12 @@ public class CloudClusterChecker extends MasterDaemon { // change all be's cluster_name currentBes.forEach(b -> b.setCloudClusterName(newClusterName)); // update clusterNameToId - Env.getCurrentSystemInfo().updateClusterNameToId(newClusterName, currentClusterName, cid); + cloudSystemInfoService.updateClusterNameToId(newClusterName, currentClusterName, cid); // update tags currentBes.forEach(b -> Env.getCurrentEnv().getEditLog().logModifyBackend(b)); } - String currentClusterStatus = Env.getCurrentSystemInfo().getCloudStatusById(cid); + String currentClusterStatus = cloudSystemInfoService.getCloudStatusById(cid); // For old versions that do no have status field set ClusterStatus clusterStatus = cp.hasClusterStatus() ? cp.getClusterStatus() : ClusterStatus.NORMAL; @@ -286,7 +289,7 @@ public class CloudClusterChecker extends MasterDaemon { continue; } - Env.getCurrentSystemInfo().updateCloudBackends(toAdd, toDel); + cloudSystemInfoService.updateCloudBackends(toAdd, toDel); } } @@ -299,7 +302,7 @@ public class CloudClusterChecker extends MasterDaemon { private void checkFeNodesMapValid() { LOG.debug("begin checkFeNodesMapValid"); - Map<String, List<Backend>> clusterIdToBackend = Env.getCurrentSystemInfo().getCloudClusterIdToBackend(); + Map<String, List<Backend>> clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend(); Set<String> clusterIds = new HashSet<>(); Set<String> clusterNames = new HashSet<>(); clusterIdToBackend.forEach((clusterId, bes) -> { @@ -313,7 +316,7 @@ public class CloudClusterChecker extends MasterDaemon { }); }); - Map<String, String> nameToId = Env.getCurrentSystemInfo().getCloudClusterNameToId(); + Map<String, String> nameToId = cloudSystemInfoService.getCloudClusterNameToId(); nameToId.forEach((clusterName, clusterId) -> { if (!clusterIdToBackend.containsKey(clusterId)) { LOG.warn("impossible, somewhere err, clusterId {}, clusterName {}, clusterNameToIdMap {}", @@ -404,7 +407,7 @@ public class CloudClusterChecker extends MasterDaemon { } private void getCloudBackends() { - Map<String, List<Backend>> clusterIdToBackend = Env.getCurrentSystemInfo().getCloudClusterIdToBackend(); + Map<String, List<Backend>> clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend(); //rpc to ms, to get mysql user can use cluster_id // NOTE: rpc args all empty, use cluster_unique_id to get a instance's all cluster info. Cloud.GetClusterResponse response = CloudSystemInfoService.getCloudCluster("", "", ""); @@ -443,14 +446,14 @@ public class CloudClusterChecker extends MasterDaemon { LOG.warn("diff cluster has exception, {}", e.getMessage(), e); } - LOG.info("daemon cluster get cluster info succ, current cloudClusterIdToBackendMap: {}", - Env.getCurrentSystemInfo().getCloudClusterIdToBackend()); + LOG.info("daemon cluster get cluster info succ, current cloudClusterIdToBackendMap: {} clusterNameToId {}", + cloudSystemInfoService.getCloudClusterIdToBackend(), cloudSystemInfoService.getCloudClusterNameToId()); } private void updateCloudMetrics() { // Metric - Map<String, List<Backend>> clusterIdToBackend = Env.getCurrentSystemInfo().getCloudClusterIdToBackend(); - Map<String, String> clusterNameToId = Env.getCurrentSystemInfo().getCloudClusterNameToId(); + Map<String, List<Backend>> clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend(); + Map<String, String> clusterNameToId = cloudSystemInfoService.getCloudClusterNameToId(); for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) { long aliveNum = 0L; List<Backend> bes = clusterIdToBackend.get(entry.getValue()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index 888eee5a0c1..f43704beb58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -17,17 +17,22 @@ package org.apache.doris.cloud.catalog; +import org.apache.doris.analysis.ResourceTypeEnum; import org.apache.doris.catalog.Env; import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.proto.Cloud.NodeInfoPB; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.util.HttpURLUtil; import org.apache.doris.common.util.NetUtils; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.httpv2.meta.MetaBaseAction; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.Storage; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService.HostInfo; @@ -52,7 +57,7 @@ public class CloudEnv extends Env { public CloudEnv(boolean isCheckpointCatalog) { super(isCheckpointCatalog); - this.cloudClusterCheck = new CloudClusterChecker(); + this.cloudClusterCheck = new CloudClusterChecker((CloudSystemInfoService) systemInfo); } protected void startMasterOnlyDaemonThreads() { @@ -355,5 +360,21 @@ public class CloudEnv extends Env { public long saveTransactionState(CountingDataOutputStream dos, long checksum) throws IOException { return checksum; } + + public void checkCloudClusterPriv(String clusterName) throws DdlException { + // check resource usage privilege + if (!Env.getCurrentEnv().getAuth().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(), + clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) { + throw new DdlException("USAGE denied to user " + + ConnectContext.get().getQualifiedUser() + "'@'" + ConnectContext.get().getRemoteIP() + + "' for cloud cluster '" + clusterName + "'", ErrorCode.ERR_CLUSTER_NO_PERMISSIONS); + } + + if (!((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterNames().contains(clusterName)) { + LOG.debug("current instance does not have a cluster name :{}", clusterName); + throw new DdlException(String.format("Cluster %s not exist", clusterName), + ErrorCode.ERR_CLOUD_CLUSTER_ERROR); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java index b8329deffbb..2ac0ee8fe67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java @@ -17,8 +17,8 @@ package org.apache.doris.cloud.catalog; -import org.apache.doris.catalog.Env; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; @@ -27,23 +27,24 @@ import org.apache.logging.log4j.Logger; public class CloudInstanceStatusChecker extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(CloudInstanceStatusChecker.class); + private CloudSystemInfoService cloudSystemInfoService; - public CloudInstanceStatusChecker() { + public CloudInstanceStatusChecker(CloudSystemInfoService cloudSystemInfoService) { super("cloud instance check"); + this.cloudSystemInfoService = cloudSystemInfoService; } @Override protected void runAfterCatalogReady() { try { - Cloud.GetInstanceResponse response = - Env.getCurrentSystemInfo().getCloudInstance(); + Cloud.GetInstanceResponse response = cloudSystemInfoService.getCloudInstance(); LOG.debug("get from ms response {}", response); if (!response.hasStatus() || !response.getStatus().hasCode() || response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { LOG.warn("failed to get cloud instance due to incomplete response, " + "cloud_unique_id={}, response={}", Config.cloud_unique_id, response); } else { - Env.getCurrentSystemInfo().setInstanceStatus(response.getInstance().getStatus()); + cloudSystemInfoService.setInstanceStatus(response.getInstance().getStatus()); } } catch (Exception e) { LOG.warn("get instance from ms exception", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index 5054fa7e41e..8e9b279bc5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -20,6 +20,7 @@ package org.apache.doris.cloud.catalog; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; @@ -81,7 +82,7 @@ public class CloudReplica extends Replica { } private long getColocatedBeId(String cluster) { - List<Backend> bes = Env.getCurrentSystemInfo().getBackendsByClusterId(cluster); + List<Backend> bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getBackendsByClusterId(cluster); List<Backend> availableBes = new ArrayList<>(); for (Backend be : bes) { if (be.isAlive()) { @@ -110,7 +111,7 @@ public class CloudReplica extends Replica { if (!Strings.isNullOrEmpty(context.getSessionVariable().getCloudCluster())) { cluster = context.getSessionVariable().getCloudCluster(); try { - Env.getCurrentEnv().checkCloudClusterPriv(cluster); + ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster); } catch (Exception e) { LOG.warn("get cluster by session context exception"); return -1; @@ -127,7 +128,8 @@ public class CloudReplica extends Replica { // check default cluster valid. if (!Strings.isNullOrEmpty(cluster)) { - boolean exist = Env.getCurrentSystemInfo().getCloudClusterNames().contains(cluster); + boolean exist = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterNames().contains(cluster); if (!exist) { //can't use this default cluster, plz change another LOG.warn("cluster: {} is not existed", cluster); @@ -140,12 +142,12 @@ public class CloudReplica extends Replica { // if cluster is SUSPENDED, wait try { - Env.waitForAutoStart(cluster); + CloudSystemInfoService.waitForAutoStart(cluster); } catch (DdlException e) { // this function cant throw exception. so just log it LOG.warn("cant resume cluster {}", cluster); } - String clusterId = Env.getCurrentSystemInfo().getCloudClusterIdByName(cluster); + String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterIdByName(cluster); if (isColocated()) { return getColocatedBeId(clusterId); @@ -201,7 +203,8 @@ public class CloudReplica extends Replica { public long hashReplicaToBe(String clusterId, boolean isBackGround) { // TODO(luwei) list should be sorted - List<Backend> clusterBes = Env.getCurrentSystemInfo().getBackendsByClusterId(clusterId); + List<Backend> clusterBes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getBackendsByClusterId(clusterId); // use alive be to exec sql List<Backend> availableBes = new ArrayList<>(); for (Backend be : clusterBes) { @@ -247,7 +250,8 @@ public class CloudReplica extends Replica { public List<Long> hashReplicaToBes(String clusterId, boolean isBackGround, int replicaNum) { // TODO(luwei) list should be sorted - List<Backend> clusterBes = Env.getCurrentSystemInfo().getBackendsByClusterId(clusterId); + List<Backend> clusterBes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getBackendsByClusterId(clusterId); // use alive be to exec sql List<Backend> availableBes = new ArrayList<>(); for (Backend be : clusterBes) { @@ -325,7 +329,8 @@ public class CloudReplica extends Replica { int count = in.readInt(); for (int i = 0; i < count; ++i) { String clusterId = Text.readString(in); - String realClusterId = Env.getCurrentSystemInfo().getCloudClusterIdByName(clusterId); + String realClusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterIdByName(clusterId); LOG.debug("cluster Id {}, real cluster Id {}", clusterId, realClusterId); if (!Strings.isNullOrEmpty(realClusterId)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java index f56cec83b99..8381d2d0959 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.load.BrokerFileGroup; @@ -61,11 +62,12 @@ public class CloudBrokerLoadJob extends BrokerLoadJob { throw new MetaNotFoundException("cluster name is empty"); } - this.cloudClusterId = Env.getCurrentSystemInfo().getCloudClusterIdByName(clusterName); + this.cloudClusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterIdByName(clusterName); if (!Strings.isNullOrEmpty(context.getSessionVariable().getCloudCluster())) { clusterName = context.getSessionVariable().getCloudCluster(); this.cloudClusterId = - Env.getCurrentSystemInfo().getCloudClusterIdByName(clusterName); + ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterIdByName(clusterName); } if (Strings.isNullOrEmpty(this.cloudClusterId)) { LOG.warn("cluster id is empty, cluster name {}", clusterName); @@ -77,7 +79,8 @@ public class CloudBrokerLoadJob extends BrokerLoadJob { private AutoCloseConnectContext buildConnectContext() throws UserException { cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID); - String clusterName = Env.getCurrentSystemInfo().getClusterNameByClusterId(cloudClusterId); + String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getClusterNameByClusterId(cloudClusterId); if (Strings.isNullOrEmpty(clusterName)) { LOG.warn("cluster name is empty, cluster id is {}", cloudClusterId); throw new UserException("cluster name is empty, cluster id is: " + cloudClusterId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 681f7e195e6..dba7daef355 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -20,9 +20,13 @@ package org.apache.doris.cloud.system; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.proto.Cloud.ClusterPB; +import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB; import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.metric.MetricRepo; import org.apache.doris.resource.Tag; @@ -32,6 +36,7 @@ import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -41,13 +46,30 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; public class CloudSystemInfoService extends SystemInfoService { private static final Logger LOG = LogManager.getLogger(CloudSystemInfoService.class); + // TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk of inconsistency + // use exclusive lock to make sure only one thread can change clusterIdToBackend and clusterNameToId + protected ReentrantLock lock = new ReentrantLock(); + + // for show cluster and cache user owned cluster + // mysqlUserName -> List of ClusterPB + private Map<String, List<ClusterPB>> mysqlUserNameToClusterPB = ImmutableMap.of(); + // clusterId -> List<Backend> + protected Map<String, List<Backend>> clusterIdToBackend = new ConcurrentHashMap<>(); + // clusterName -> clusterId + protected Map<String, String> clusterNameToId = new ConcurrentHashMap<>(); + + private InstanceInfoPB.Status instanceStatus; + @Override public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation( ReplicaAllocation replicaAlloc, Map<Tag, Integer> nextIndexs, @@ -61,7 +83,7 @@ public class CloudSystemInfoService extends SystemInfoService { * Gets cloud cluster from remote with either clusterId or clusterName * * @param clusterName cluster name - * @param clusterId cluster id + * @param clusterId cluster id * @return */ public static Cloud.GetClusterResponse getCloudCluster(String clusterName, String clusterId, String userName) { @@ -86,7 +108,7 @@ public class CloudSystemInfoService extends SystemInfoService { LOG.info("nothing to do"); return; } - Set<String> existedBes = idToBackendRef.entrySet().stream().map(i -> i.getValue()) + Set<String> existedBes = idToBackendRef.values().stream() .map(i -> i.getHost() + ":" + i.getHeartbeatPort()) .collect(Collectors.toSet()); LOG.debug("deduplication existedBes={}", existedBes); @@ -157,79 +179,83 @@ public class CloudSystemInfoService extends SystemInfoService { public void updateCloudClusterMap(List<Backend> toAdd, List<Backend> toDel) { lock.lock(); - Set<String> clusterNameSet = new HashSet<>(); - for (Backend b : toAdd) { - String clusterName = b.getCloudClusterName(); - String clusterId = b.getCloudClusterId(); - if (clusterName.isEmpty() || clusterId.isEmpty()) { - LOG.warn("cloud cluster name or id empty: id={}, name={}", clusterId, clusterName); - continue; - } - clusterNameSet.add(clusterName); - if (clusterNameSet.size() != 1) { - LOG.warn("toAdd be list have multi clusterName, please check, Set: {}", clusterNameSet); - } + try { + Set<String> clusterNameSet = new HashSet<>(); + for (Backend b : toAdd) { + String clusterName = b.getCloudClusterName(); + String clusterId = b.getCloudClusterId(); + if (clusterName.isEmpty() || clusterId.isEmpty()) { + LOG.warn("cloud cluster name or id empty: id={}, name={}", clusterId, clusterName); + continue; + } + clusterNameSet.add(clusterName); + if (clusterNameSet.size() != 1) { + LOG.warn("toAdd be list have multi clusterName, please check, Set: {}", clusterNameSet); + } - clusterNameToId.put(clusterName, clusterId); - List<Backend> be = clusterIdToBackend.get(clusterId); - if (be == null) { - be = new ArrayList<>(); - clusterIdToBackend.put(clusterId, be); - MetricRepo.registerClusterMetrics(clusterName, clusterId); - } - Set<String> existed = be.stream().map(i -> i.getHost() + ":" + i.getHeartbeatPort()) - .collect(Collectors.toSet()); - // Deduplicate - // TODO(gavin): consider vpc - boolean alreadyExisted = existed.contains(b.getHost() + ":" + b.getHeartbeatPort()); - if (alreadyExisted) { - LOG.info("BE already existed, clusterName={} clusterId={} backendNum={} backend={}", + clusterNameToId.put(clusterName, clusterId); + List<Backend> be = clusterIdToBackend.get(clusterId); + if (be == null) { + be = new ArrayList<>(); + clusterIdToBackend.put(clusterId, be); + MetricRepo.registerClusterMetrics(clusterName, clusterId); + } + Set<String> existed = be.stream().map(i -> i.getHost() + ":" + i.getHeartbeatPort()) + .collect(Collectors.toSet()); + // Deduplicate + // TODO(gavin): consider vpc + boolean alreadyExisted = existed.contains(b.getHost() + ":" + b.getHeartbeatPort()); + if (alreadyExisted) { + LOG.info("BE already existed, clusterName={} clusterId={} backendNum={} backend={}", + clusterName, clusterId, be.size(), b); + continue; + } + be.add(b); + LOG.info("update (add) cloud cluster map, clusterName={} clusterId={} backendNum={} current backend={}", clusterName, clusterId, be.size(), b); - continue; } - be.add(b); - LOG.info("update (add) cloud cluster map, clusterName={} clusterId={} backendNum={} current backend={}", - clusterName, clusterId, be.size(), b); - } - for (Backend b : toDel) { - String clusterName = b.getCloudClusterName(); - String clusterId = b.getCloudClusterId(); - // We actually don't care about cluster name here - if (clusterName.isEmpty() || clusterId.isEmpty()) { - LOG.warn("cloud cluster name or id empty: id={}, name={}", clusterId, clusterName); - continue; - } - List<Backend> be = clusterIdToBackend.get(clusterId); - if (be == null) { - LOG.warn("try to remove a non-existing cluster, clusterId={} clusterName={}", - clusterId, clusterName); - continue; - } - Set<Long> d = toDel.stream().map(i -> i.getId()).collect(Collectors.toSet()); - be = be.stream().filter(i -> !d.contains(i.getId())).collect(Collectors.toList()); - // ATTN: clusterId may have zero nodes - clusterIdToBackend.replace(clusterId, be); - // such as dropCluster, but no lock - // ATTN: Empty clusters are treated as dropped clusters. - if (be.size() == 0) { - LOG.info("del clusterId {} and clusterName {} due to be nodes eq 0", clusterId, clusterName); - boolean succ = clusterNameToId.remove(clusterName, clusterId); - if (!succ) { - LOG.warn("impossible, somewhere err, clusterNameToId {}, " - + "want remove cluster name {}, cluster id {}", clusterNameToId, clusterName, clusterId); + for (Backend b : toDel) { + String clusterName = b.getCloudClusterName(); + String clusterId = b.getCloudClusterId(); + // We actually don't care about cluster name here + if (clusterName.isEmpty() || clusterId.isEmpty()) { + LOG.warn("cloud cluster name or id empty: id={}, name={}", clusterId, clusterName); + continue; } - clusterIdToBackend.remove(clusterId); + List<Backend> be = clusterIdToBackend.get(clusterId); + if (be == null) { + LOG.warn("try to remove a non-existing cluster, clusterId={} clusterName={}", + clusterId, clusterName); + continue; + } + Set<Long> d = toDel.stream().map(i -> i.getId()).collect(Collectors.toSet()); + be = be.stream().filter(i -> !d.contains(i.getId())).collect(Collectors.toList()); + // ATTN: clusterId may have zero nodes + clusterIdToBackend.replace(clusterId, be); + // such as dropCluster, but no lock + // ATTN: Empty clusters are treated as dropped clusters. + if (be.size() == 0) { + LOG.info("del clusterId {} and clusterName {} due to be nodes eq 0", clusterId, clusterName); + boolean succ = clusterNameToId.remove(clusterName, clusterId); + if (!succ) { + LOG.warn("impossible, somewhere err, clusterNameToId {}, " + + "want remove cluster name {}, cluster id {}", + clusterNameToId, clusterName, clusterId); + } + clusterIdToBackend.remove(clusterId); + } + LOG.info("update (del) cloud cluster map, clusterName={} clusterId={} backendNum={} current backend={}", + clusterName, clusterId, be.size(), b); } - LOG.info("update (del) cloud cluster map, clusterName={} clusterId={} backendNum={} current backend={}", - clusterName, clusterId, be.size(), b); + } finally { + lock.unlock(); } - lock.unlock(); } public static synchronized void updateFrontends(List<Frontend> toAdd, - List<Frontend> toDel) throws DdlException { + List<Frontend> toDel) throws DdlException { LOG.debug("updateCloudFrontends toAdd={} toDel={}", toAdd, toDel); String masterIp = Env.getCurrentEnv().getMasterHost(); for (Frontend fe : toAdd) { @@ -248,5 +274,180 @@ public class CloudSystemInfoService extends SystemInfoService { LOG.info("dropped cloud frontend={} ", fe); } } -} + public void replayAddBackend(Backend newBackend) { + super.replayAddBackend(newBackend); + List<Backend> toAdd = new ArrayList<>(); + toAdd.add(newBackend); + updateCloudClusterMap(toAdd, new ArrayList<>()); + } + + public void replayDropBackend(Backend backend) { + super.replayDropBackend(backend); + List<Backend> toDel = new ArrayList<>(); + toDel.add(backend); + updateCloudClusterMap(new ArrayList<>(), toDel); + } + + public boolean availableBackendsExists() { + if (FeConstants.runningUnitTest) { + return true; + } + if (null == clusterNameToId || clusterNameToId.isEmpty()) { + return false; + } + return clusterIdToBackend != null && !clusterIdToBackend.isEmpty() + && clusterIdToBackend.values().stream().anyMatch(list -> list != null && !list.isEmpty()); + } + + public boolean containClusterName(String clusterName) { + return clusterNameToId.containsKey(clusterName); + } + + public List<Backend> getBackendsByClusterName(final String clusterName) { + String clusterId = clusterNameToId.getOrDefault(clusterName, ""); + if (clusterId.isEmpty()) { + return new ArrayList<>(); + } + return clusterIdToBackend.get(clusterId); + } + + public List<Backend> getBackendsByClusterId(final String clusterId) { + return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>()); + } + + public List<String> getCloudClusterIds() { + return new ArrayList<>(clusterIdToBackend.keySet()); + } + + public String getCloudStatusByName(final String clusterName) { + String clusterId = clusterNameToId.getOrDefault(clusterName, ""); + if (Strings.isNullOrEmpty(clusterId)) { + // for rename cluster or dropped cluster + LOG.warn("cant find clusterId by clusterName {}", clusterName); + return ""; + } + return getCloudStatusById(clusterId); + } + + public String getCloudStatusById(final String clusterId) { + return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>()) + .stream().map(Backend::getCloudClusterStatus).findFirst().orElse(""); + } + + public void updateClusterNameToId(final String newName, + final String originalName, final String clusterId) { + lock.lock(); + try { + clusterNameToId.remove(originalName); + clusterNameToId.put(newName, clusterId); + } finally { + lock.unlock(); + } + } + + public String getClusterNameByClusterId(final String clusterId) { + String clusterName = ""; + for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) { + if (entry.getValue().equals(clusterId)) { + clusterName = entry.getKey(); + break; + } + } + return clusterName; + } + + public void dropCluster(final String clusterId, final String clusterName) { + lock.lock(); + try { + clusterNameToId.remove(clusterName, clusterId); + clusterIdToBackend.remove(clusterId); + } finally { + lock.unlock(); + } + } + + public List<String> getCloudClusterNames() { + return new ArrayList<>(clusterNameToId.keySet()); + } + + // Return the ref of concurrentMap clusterIdToBackend + // It should be thread-safe to iterate. + // reference: https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe + public Map<String, List<Backend>> getCloudClusterIdToBackend() { + return clusterIdToBackend; + } + + public String getCloudClusterIdByName(String clusterName) { + return clusterNameToId.get(clusterName); + } + + public ImmutableMap<Long, Backend> getCloudIdToBackend(String clusterName) { + String clusterId = clusterNameToId.get(clusterName); + if (Strings.isNullOrEmpty(clusterId)) { + LOG.warn("cant find clusterId, this cluster may be has been dropped, clusterName={}", clusterName); + return ImmutableMap.of(); + } + List<Backend> backends = clusterIdToBackend.get(clusterId); + Map<Long, Backend> idToBackend = Maps.newHashMap(); + for (Backend be : backends) { + idToBackend.put(be.getId(), be); + } + return ImmutableMap.copyOf(idToBackend); + } + + // Return the ref of concurrentMap clusterNameToId + // It should be thread-safe to iterate. + // reference: https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe + public Map<String, String> getCloudClusterNameToId() { + return clusterNameToId; + } + + public Map<String, List<ClusterPB>> getMysqlUserNameToClusterPb() { + return mysqlUserNameToClusterPB; + } + + public void updateMysqlUserNameToClusterPb(Map<String, List<ClusterPB>> m) { + mysqlUserNameToClusterPB = m; + } + + public List<Pair<String, Integer>> getCurrentObFrontends() { + List<Frontend> frontends = Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER); + List<Pair<String, Integer>> frontendsPair = new ArrayList<>(); + for (Frontend frontend : frontends) { + frontendsPair.add(Pair.of(frontend.getHost(), frontend.getEditLogPort())); + } + return frontendsPair; + } + + public Cloud.GetInstanceResponse getCloudInstance() { + Cloud.GetInstanceRequest.Builder builder = Cloud.GetInstanceRequest.newBuilder(); + builder.setCloudUniqueId(Config.cloud_unique_id); + final Cloud.GetInstanceRequest pRequest = builder.build(); + Cloud.GetInstanceResponse response; + try { + response = MetaServiceProxy.getInstance().getInstance(pRequest); + return response; + } catch (RpcException e) { + LOG.warn("rpcToGetInstance exception: {}", e.getMessage()); + } + return null; + } + + public InstanceInfoPB.Status getInstanceStatus() { + return this.instanceStatus; + } + + public void setInstanceStatus(InstanceInfoPB.Status instanceStatus) { + LOG.debug("fe set instance status {}", instanceStatus); + if (this.instanceStatus != instanceStatus) { + LOG.info("fe change instance status from {} to {}", this.instanceStatus, instanceStatus); + } + this.instanceStatus = instanceStatus; + } + + public static void waitForAutoStart(final String clusterName) throws DdlException { + // TODO: merge from cloud. + throw new DdlException("Env.waitForAutoStart unimplemented"); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index b5794464d98..95f0c5831a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FunctionRegistry; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; @@ -1082,11 +1083,12 @@ public class ConnectContext { } public String getAuthorizedCloudCluster() { - List<String> cloudClusterNames = Env.getCurrentSystemInfo().getCloudClusterNames(); + List<String> cloudClusterNames = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterNames(); // get all available cluster of the user for (String cloudClusterName : cloudClusterNames) { // find a cluster has more than one alive be - List<Backend> bes = Env.getCurrentSystemInfo().getBackendsByClusterName(cloudClusterName); + List<Backend> bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getBackendsByClusterName(cloudClusterName); AtomicBoolean hasAliveBe = new AtomicBoolean(false); bes.stream().filter(Backend::isAlive).findAny().ifPresent(backend -> { LOG.debug("get a clusterName {}, it's has more than one alive be {}", clusterName, backend); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index f45086f239d..05ba7e073ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -23,10 +23,6 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ReplicaAllocation; -import org.apache.doris.cloud.proto.Cloud; -import org.apache.doris.cloud.proto.Cloud.ClusterPB; -import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB; -import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -36,11 +32,9 @@ import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.util.NetUtils; -import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.metric.MetricRepo; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; -import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TNodeInfo; import org.apache.doris.thrift.TPaloNodesInfo; import org.apache.doris.thrift.TStatusCode; @@ -69,9 +63,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; public class SystemInfoService { @@ -88,22 +80,9 @@ public class SystemInfoService { protected volatile ImmutableMap<Long, Backend> idToBackendRef = ImmutableMap.of(); protected volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef = ImmutableMap.of(); - // TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk of inconsistency - // use exclusive lock to make sure only one thread can change clusterIdToBackend and clusterNameToId - protected ReentrantLock lock = new ReentrantLock(); - - // for show cluster and cache user owned cluster - // mysqlUserName -> List of ClusterPB - private Map<String, List<ClusterPB>> mysqlUserNameToClusterPB = ImmutableMap.of(); - // clusterId -> List<Backend> - protected Map<String, List<Backend>> clusterIdToBackend = new ConcurrentHashMap<>(); - // clusterName -> clusterId - protected Map<String, String> clusterNameToId = new ConcurrentHashMap<>(); private volatile ImmutableMap<Long, DiskInfo> pathHashToDiskInfoRef = ImmutableMap.of(); - private InstanceInfoPB.Status instanceStatus; - public static class HostInfo implements Comparable<HostInfo> { public String host; public int port; @@ -183,114 +162,6 @@ public class SystemInfoService { } }; - public boolean availableBackendsExists() { - if (FeConstants.runningUnitTest) { - return true; - } - if (null == clusterNameToId || clusterNameToId.isEmpty()) { - return false; - } - return clusterIdToBackend != null && !clusterIdToBackend.isEmpty() - && clusterIdToBackend.values().stream().anyMatch(list -> list != null && !list.isEmpty()); - } - - public boolean containClusterName(String clusterName) { - return clusterNameToId.containsKey(clusterName); - } - - public List<Backend> getBackendsByClusterName(final String clusterName) { - String clusterId = clusterNameToId.getOrDefault(clusterName, ""); - if (clusterId.isEmpty()) { - return new ArrayList<>(); - } - return clusterIdToBackend.get(clusterId); - } - - public List<Backend> getBackendsByClusterId(final String clusterId) { - return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>()); - } - - public List<String> getCloudClusterIds() { - return new ArrayList<>(clusterIdToBackend.keySet()); - } - - public String getCloudStatusByName(final String clusterName) { - String clusterId = clusterNameToId.getOrDefault(clusterName, ""); - if (Strings.isNullOrEmpty(clusterId)) { - // for rename cluster or dropped cluster - LOG.warn("cant find clusterId by clusterName {}", clusterName); - return ""; - } - return getCloudStatusById(clusterId); - } - - public String getCloudStatusById(final String clusterId) { - return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>()) - .stream().map(Backend::getCloudClusterStatus).findFirst().orElse(""); - } - - public void updateClusterNameToId(final String newName, - final String originalName, final String clusterId) { - lock.lock(); - clusterNameToId.remove(originalName); - clusterNameToId.put(newName, clusterId); - lock.unlock(); - } - - public String getClusterNameByClusterId(final String clusterId) { - String clusterName = ""; - for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) { - if (entry.getValue().equals(clusterId)) { - clusterName = entry.getKey(); - break; - } - } - return clusterName; - } - - public void dropCluster(final String clusterId, final String clusterName) { - lock.lock(); - clusterNameToId.remove(clusterName, clusterId); - clusterIdToBackend.remove(clusterId); - lock.unlock(); - } - - public List<String> getCloudClusterNames() { - return new ArrayList<>(clusterNameToId.keySet()); - } - - // Return the ref of concurrentMap clusterIdToBackend - // It should be thread-safe to iterate. - // reference: https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe - public Map<String, List<Backend>> getCloudClusterIdToBackend() { - return clusterIdToBackend; - } - - public String getCloudClusterIdByName(String clusterName) { - return clusterNameToId.get(clusterName); - } - - public ImmutableMap<Long, Backend> getCloudIdToBackend(String clusterName) { - String clusterId = clusterNameToId.get(clusterName); - if (Strings.isNullOrEmpty(clusterId)) { - LOG.warn("cant find clusterId, this cluster may be has been dropped, clusterName={}", clusterName); - return ImmutableMap.of(); - } - List<Backend> backends = clusterIdToBackend.get(clusterId); - Map<Long, Backend> idToBackend = Maps.newHashMap(); - for (Backend be : backends) { - idToBackend.put(be.getId(), be); - } - return ImmutableMap.copyOf(idToBackend); - } - - // Return the ref of concurrentMap clusterNameToId - // It should be thread-safe to iterate. - // reference: https://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe - public Map<String, String> getCloudClusterNameToId() { - return clusterNameToId; - } - public static TPaloNodesInfo createAliveNodesInfo() { TPaloNodesInfo nodesInfo = new TPaloNodesInfo(); SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); @@ -301,23 +172,6 @@ public class SystemInfoService { return nodesInfo; } - public Map<String, List<ClusterPB>> getMysqlUserNameToClusterPb() { - return mysqlUserNameToClusterPB; - } - - public void updateMysqlUserNameToClusterPb(Map<String, List<ClusterPB>> m) { - mysqlUserNameToClusterPB = m; - } - - public List<Pair<String, Integer>> getCurrentObFrontends() { - List<Frontend> frontends = Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER); - List<Pair<String, Integer>> frontendsPair = new ArrayList<>(); - for (Frontend frontend : frontends) { - frontendsPair.add(Pair.of(frontend.getHost(), frontend.getEditLogPort())); - } - return frontendsPair; - } - // for deploy manager public void addBackends(List<HostInfo> hostInfos, boolean isFree) throws UserException { @@ -1141,35 +995,4 @@ public class SystemInfoService { public long aliveBECount() { return idToBackendRef.values().stream().filter(Backend::isAlive).count(); } - - public Cloud.GetInstanceResponse getCloudInstance() { - Cloud.GetInstanceRequest.Builder builder = - Cloud.GetInstanceRequest.newBuilder(); - builder.setCloudUniqueId(Config.cloud_unique_id); - final Cloud.GetInstanceRequest pRequest = builder.build(); - Cloud.GetInstanceResponse response; - try { - response = MetaServiceProxy.getInstance().getInstance(pRequest); - return response; - } catch (RpcException e) { - LOG.warn("rpcToGetInstance exception: {}", e.getMessage()); - } - return null; - } - - public InstanceInfoPB.Status getInstanceStatus() { - return this.instanceStatus; - } - - public void setInstanceStatus(InstanceInfoPB.Status instanceStatus) { - LOG.debug("fe set instance status {}", instanceStatus); - if (this.instanceStatus != instanceStatus) { - LOG.info("fe change instance status from {} to {}", this.instanceStatus, instanceStatus); - } - this.instanceStatus = instanceStatus; - } - - public synchronized void updateCloudBackends(List<Backend> toAdd, List<Backend> toDel) { - LOG.warn("Not cloud mode, should not be here"); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org