This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3dade1c03c6 [feature](merge-cloud) Fix cloud cluster checker cant get backends (#30467) 3dade1c03c6 is described below commit 3dade1c03c63ae59658aecbc5e42cd8d1d152776 Author: deardeng <565620...@qq.com> AuthorDate: Mon Jan 29 20:06:58 2024 +0800 [feature](merge-cloud) Fix cloud cluster checker cant get backends (#30467) --- .../doris/cloud/catalog/CloudClusterChecker.java | 157 +++++++++++---------- 1 file changed, 84 insertions(+), 73 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java index 527909c7bf0..5288cd73ea7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java @@ -20,6 +20,7 @@ package org.apache.doris.cloud.catalog; import org.apache.doris.catalog.Env; import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.proto.Cloud.ClusterPB; +import org.apache.doris.cloud.proto.Cloud.ClusterPB.Type; import org.apache.doris.cloud.proto.Cloud.ClusterStatus; import org.apache.doris.cloud.proto.Cloud.MetaServiceCode; import org.apache.doris.cloud.system.CloudSystemInfoService; @@ -291,78 +292,9 @@ public class CloudClusterChecker extends MasterDaemon { @Override protected void runAfterCatalogReady() { - Map<String, List<Backend>> clusterIdToBackend = Env.getCurrentSystemInfo().getCloudClusterIdToBackend(); - //rpc to ms, to get mysql user can use cluster_id - // NOTE: rpc args all empty, use cluster_unique_id to get a instance's all cluster info. - Cloud.GetClusterResponse response = CloudSystemInfoService.getCloudCluster("", "", ""); - if (!response.hasStatus() || !response.getStatus().hasCode() - || (response.getStatus().getCode() != Cloud.MetaServiceCode.OK - && response.getStatus().getCode() != MetaServiceCode.CLUSTER_NOT_FOUND)) { - LOG.warn("failed to get cloud cluster due to incomplete response, " - + "cloud_unique_id={}, response={}", Config.cloud_unique_id, response); - } else { - // clusterId -> clusterPB - Map<String, ClusterPB> remoteClusterIdToPB = new HashMap<>(); - Set<String> localClusterIds = clusterIdToBackend.keySet(); - - try { - // cluster_ids diff remote <clusterId, nodes> and local <clusterId, nodes> - // remote - local > 0, add bes to local - checkToAddCluster(remoteClusterIdToPB, localClusterIds); - - // local - remote > 0, drop bes from local - checkToDelCluster(remoteClusterIdToPB, localClusterIds, clusterIdToBackend); - - if (remoteClusterIdToPB.keySet().size() != clusterIdToBackend.keySet().size()) { - LOG.warn("impossible cluster id size not match, check it local {}, remote {}", - clusterIdToBackend, remoteClusterIdToPB); - } - // clusterID local == remote, diff nodes - checkDiffNode(remoteClusterIdToPB, clusterIdToBackend); - - // check mem map - checkFeNodesMapValid(); - } catch (Exception e) { - LOG.warn("diff cluster has exception, {}", e.getMessage(), e); - } - } - - // Metric - clusterIdToBackend = Env.getCurrentSystemInfo().getCloudClusterIdToBackend(); - Map<String, String> clusterNameToId = Env.getCurrentSystemInfo().getCloudClusterNameToId(); - for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) { - long aliveNum = 0L; - List<Backend> bes = clusterIdToBackend.get(entry.getValue()); - if (bes == null || bes.size() == 0) { - LOG.info("cant get be nodes by cluster {}, bes {}", entry, bes); - continue; - } - for (Backend backend : bes) { - MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE.computeIfAbsent(backend.getAddress(), key -> { - GaugeMetricImpl<Integer> backendAlive = new GaugeMetricImpl<>("backend_alive", MetricUnit.NOUNIT, - "backend alive or not"); - backendAlive.addLabel(new MetricLabel("cluster_id", entry.getValue())); - backendAlive.addLabel(new MetricLabel("cluster_name", entry.getKey())); - backendAlive.addLabel(new MetricLabel("address", key)); - MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAlive); - return backendAlive; - }).setValue(backend.isAlive() ? 1 : 0); - aliveNum = backend.isAlive() ? aliveNum + 1 : aliveNum; - } - - MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL.computeIfAbsent(entry.getKey(), key -> { - GaugeMetricImpl<Long> backendAliveTotal = new GaugeMetricImpl<>("backend_alive_total", - MetricUnit.NOUNIT, "backend alive num in cluster"); - backendAliveTotal.addLabel(new MetricLabel("cluster_id", entry.getValue())); - backendAliveTotal.addLabel(new MetricLabel("cluster_name", key)); - MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAliveTotal); - return backendAliveTotal; - }).setValue(aliveNum); - } - - LOG.info("daemon cluster get cluster info succ, current cloudClusterIdToBackendMap: {}", - Env.getCurrentSystemInfo().getCloudClusterIdToBackend()); - getObserverFes(); + getCloudBackends(); + updateCloudMetrics(); + getCloudObserverFes(); } private void checkFeNodesMapValid() { @@ -403,7 +335,7 @@ public class CloudClusterChecker extends MasterDaemon { } } - private void getObserverFes() { + private void getCloudObserverFes() { Cloud.GetClusterResponse response = CloudSystemInfoService .getCloudCluster(Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, ""); if (!response.hasStatus() || !response.getStatus().hasCode() @@ -470,5 +402,84 @@ public class CloudClusterChecker extends MasterDaemon { LOG.warn("update cloud frontends exception e: {}, msg: {}", e, e.getMessage()); } } + + private void getCloudBackends() { + Map<String, List<Backend>> clusterIdToBackend = Env.getCurrentSystemInfo().getCloudClusterIdToBackend(); + //rpc to ms, to get mysql user can use cluster_id + // NOTE: rpc args all empty, use cluster_unique_id to get a instance's all cluster info. + Cloud.GetClusterResponse response = CloudSystemInfoService.getCloudCluster("", "", ""); + if (!response.hasStatus() || !response.getStatus().hasCode() + || (response.getStatus().getCode() != Cloud.MetaServiceCode.OK + && response.getStatus().getCode() != MetaServiceCode.CLUSTER_NOT_FOUND)) { + LOG.warn("failed to get cloud cluster due to incomplete response, " + + "cloud_unique_id={}, response={}", Config.cloud_unique_id, response); + return; + } + Set<String> localClusterIds = clusterIdToBackend.keySet(); + // clusterId -> clusterPB + Map<String, ClusterPB> remoteClusterIdToPB = response.getClusterList().stream() + .filter(c -> c.getType() != Type.SQL) + .collect(Collectors.toMap(ClusterPB::getClusterId, clusterPB -> clusterPB)); + LOG.info("get cluster info clusterIds: {}", remoteClusterIdToPB); + + try { + // cluster_ids diff remote <clusterId, nodes> and local <clusterId, nodes> + // remote - local > 0, add bes to local + checkToAddCluster(remoteClusterIdToPB, localClusterIds); + + // local - remote > 0, drop bes from local + checkToDelCluster(remoteClusterIdToPB, localClusterIds, clusterIdToBackend); + + if (remoteClusterIdToPB.keySet().size() != clusterIdToBackend.keySet().size()) { + LOG.warn("impossible cluster id size not match, check it local {}, remote {}", + clusterIdToBackend, remoteClusterIdToPB); + } + // clusterID local == remote, diff nodes + checkDiffNode(remoteClusterIdToPB, clusterIdToBackend); + + // check mem map + checkFeNodesMapValid(); + } catch (Exception e) { + LOG.warn("diff cluster has exception, {}", e.getMessage(), e); + + } + LOG.info("daemon cluster get cluster info succ, current cloudClusterIdToBackendMap: {}", + Env.getCurrentSystemInfo().getCloudClusterIdToBackend()); + } + + private void updateCloudMetrics() { + // Metric + Map<String, List<Backend>> clusterIdToBackend = Env.getCurrentSystemInfo().getCloudClusterIdToBackend(); + Map<String, String> clusterNameToId = Env.getCurrentSystemInfo().getCloudClusterNameToId(); + for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) { + long aliveNum = 0L; + List<Backend> bes = clusterIdToBackend.get(entry.getValue()); + if (bes == null || bes.size() == 0) { + LOG.info("cant get be nodes by cluster {}, bes {}", entry, bes); + continue; + } + for (Backend backend : bes) { + MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE.computeIfAbsent(backend.getAddress(), key -> { + GaugeMetricImpl<Integer> backendAlive = new GaugeMetricImpl<>("backend_alive", MetricUnit.NOUNIT, + "backend alive or not"); + backendAlive.addLabel(new MetricLabel("cluster_id", entry.getValue())); + backendAlive.addLabel(new MetricLabel("cluster_name", entry.getKey())); + backendAlive.addLabel(new MetricLabel("address", key)); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAlive); + return backendAlive; + }).setValue(backend.isAlive() ? 1 : 0); + aliveNum = backend.isAlive() ? aliveNum + 1 : aliveNum; + } + + MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL.computeIfAbsent(entry.getKey(), key -> { + GaugeMetricImpl<Long> backendAliveTotal = new GaugeMetricImpl<>("backend_alive_total", + MetricUnit.NOUNIT, "backend alive num in cluster"); + backendAliveTotal.addLabel(new MetricLabel("cluster_id", entry.getValue())); + backendAliveTotal.addLabel(new MetricLabel("cluster_name", key)); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAliveTotal); + return backendAliveTotal; + }).setValue(aliveNum); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org