This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dade1c03c6 [feature](merge-cloud) Fix cloud cluster checker cant get 
backends (#30467)
3dade1c03c6 is described below

commit 3dade1c03c63ae59658aecbc5e42cd8d1d152776
Author: deardeng <565620...@qq.com>
AuthorDate: Mon Jan 29 20:06:58 2024 +0800

    [feature](merge-cloud) Fix cloud cluster checker cant get backends (#30467)
---
 .../doris/cloud/catalog/CloudClusterChecker.java   | 157 +++++++++++----------
 1 file changed, 84 insertions(+), 73 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index 527909c7bf0..5288cd73ea7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -20,6 +20,7 @@ package org.apache.doris.cloud.catalog;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.cloud.proto.Cloud.ClusterPB;
+import org.apache.doris.cloud.proto.Cloud.ClusterPB.Type;
 import org.apache.doris.cloud.proto.Cloud.ClusterStatus;
 import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
@@ -291,78 +292,9 @@ public class CloudClusterChecker extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
-        Map<String, List<Backend>> clusterIdToBackend = 
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
-        //rpc to ms, to get mysql user can use cluster_id
-        // NOTE: rpc args all empty, use cluster_unique_id to get a instance's 
all cluster info.
-        Cloud.GetClusterResponse response = 
CloudSystemInfoService.getCloudCluster("", "", "");
-        if (!response.hasStatus() || !response.getStatus().hasCode()
-                || (response.getStatus().getCode() != Cloud.MetaServiceCode.OK
-                && response.getStatus().getCode() != 
MetaServiceCode.CLUSTER_NOT_FOUND)) {
-            LOG.warn("failed to get cloud cluster due to incomplete response, "
-                    + "cloud_unique_id={}, response={}", 
Config.cloud_unique_id, response);
-        } else {
-            // clusterId -> clusterPB
-            Map<String, ClusterPB> remoteClusterIdToPB = new HashMap<>();
-            Set<String> localClusterIds = clusterIdToBackend.keySet();
-
-            try {
-                // cluster_ids diff remote <clusterId, nodes> and local 
<clusterId, nodes>
-                // remote - local > 0, add bes to local
-                checkToAddCluster(remoteClusterIdToPB, localClusterIds);
-
-                // local - remote > 0, drop bes from local
-                checkToDelCluster(remoteClusterIdToPB, localClusterIds, 
clusterIdToBackend);
-
-                if (remoteClusterIdToPB.keySet().size() != 
clusterIdToBackend.keySet().size()) {
-                    LOG.warn("impossible cluster id size not match, check it 
local {}, remote {}",
-                            clusterIdToBackend, remoteClusterIdToPB);
-                }
-                // clusterID local == remote, diff nodes
-                checkDiffNode(remoteClusterIdToPB, clusterIdToBackend);
-
-                // check mem map
-                checkFeNodesMapValid();
-            } catch (Exception e) {
-                LOG.warn("diff cluster has exception, {}", e.getMessage(), e);
-            }
-        }
-
-        // Metric
-        clusterIdToBackend = 
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
-        Map<String, String> clusterNameToId = 
Env.getCurrentSystemInfo().getCloudClusterNameToId();
-        for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
-            long aliveNum = 0L;
-            List<Backend> bes = clusterIdToBackend.get(entry.getValue());
-            if (bes == null || bes.size() == 0) {
-                LOG.info("cant get be nodes by cluster {}, bes {}", entry, 
bes);
-                continue;
-            }
-            for (Backend backend : bes) {
-                
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE.computeIfAbsent(backend.getAddress(), 
key -> {
-                    GaugeMetricImpl<Integer> backendAlive = new 
GaugeMetricImpl<>("backend_alive", MetricUnit.NOUNIT,
-                            "backend alive or not");
-                    backendAlive.addLabel(new MetricLabel("cluster_id", 
entry.getValue()));
-                    backendAlive.addLabel(new MetricLabel("cluster_name", 
entry.getKey()));
-                    backendAlive.addLabel(new MetricLabel("address", key));
-                    MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAlive);
-                    return backendAlive;
-                }).setValue(backend.isAlive() ? 1 : 0);
-                aliveNum = backend.isAlive() ? aliveNum + 1 : aliveNum;
-            }
-
-            
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL.computeIfAbsent(entry.getKey(), 
key -> {
-                GaugeMetricImpl<Long> backendAliveTotal = new 
GaugeMetricImpl<>("backend_alive_total",
-                        MetricUnit.NOUNIT, "backend alive num in cluster");
-                backendAliveTotal.addLabel(new MetricLabel("cluster_id", 
entry.getValue()));
-                backendAliveTotal.addLabel(new MetricLabel("cluster_name", 
key));
-                MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAliveTotal);
-                return backendAliveTotal;
-            }).setValue(aliveNum);
-        }
-
-        LOG.info("daemon cluster get cluster info succ, current 
cloudClusterIdToBackendMap: {}",
-                Env.getCurrentSystemInfo().getCloudClusterIdToBackend());
-        getObserverFes();
+        getCloudBackends();
+        updateCloudMetrics();
+        getCloudObserverFes();
     }
 
     private void checkFeNodesMapValid() {
@@ -403,7 +335,7 @@ public class CloudClusterChecker extends MasterDaemon {
         }
     }
 
-    private void getObserverFes() {
+    private void getCloudObserverFes() {
         Cloud.GetClusterResponse response = CloudSystemInfoService
                 .getCloudCluster(Config.cloud_sql_server_cluster_name, 
Config.cloud_sql_server_cluster_id, "");
         if (!response.hasStatus() || !response.getStatus().hasCode()
@@ -470,5 +402,84 @@ public class CloudClusterChecker extends MasterDaemon {
             LOG.warn("update cloud frontends exception e: {}, msg: {}", e, 
e.getMessage());
         }
     }
+
+    private void getCloudBackends() {
+        Map<String, List<Backend>> clusterIdToBackend = 
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+        //rpc to ms, to get mysql user can use cluster_id
+        // NOTE: rpc args all empty, use cluster_unique_id to get a instance's 
all cluster info.
+        Cloud.GetClusterResponse response = 
CloudSystemInfoService.getCloudCluster("", "", "");
+        if (!response.hasStatus() || !response.getStatus().hasCode()
+                || (response.getStatus().getCode() != Cloud.MetaServiceCode.OK
+                && response.getStatus().getCode() != 
MetaServiceCode.CLUSTER_NOT_FOUND)) {
+            LOG.warn("failed to get cloud cluster due to incomplete response, "
+                    + "cloud_unique_id={}, response={}", 
Config.cloud_unique_id, response);
+            return;
+        }
+        Set<String> localClusterIds = clusterIdToBackend.keySet();
+        // clusterId -> clusterPB
+        Map<String, ClusterPB> remoteClusterIdToPB = 
response.getClusterList().stream()
+                .filter(c -> c.getType() != Type.SQL)
+                .collect(Collectors.toMap(ClusterPB::getClusterId, clusterPB 
-> clusterPB));
+        LOG.info("get cluster info  clusterIds: {}", remoteClusterIdToPB);
+
+        try {
+            // cluster_ids diff remote <clusterId, nodes> and local 
<clusterId, nodes>
+            // remote - local > 0, add bes to local
+            checkToAddCluster(remoteClusterIdToPB, localClusterIds);
+
+            // local - remote > 0, drop bes from local
+            checkToDelCluster(remoteClusterIdToPB, localClusterIds, 
clusterIdToBackend);
+
+            if (remoteClusterIdToPB.keySet().size() != 
clusterIdToBackend.keySet().size()) {
+                LOG.warn("impossible cluster id size not match, check it local 
{}, remote {}",
+                        clusterIdToBackend, remoteClusterIdToPB);
+            }
+            // clusterID local == remote, diff nodes
+            checkDiffNode(remoteClusterIdToPB, clusterIdToBackend);
+
+            // check mem map
+            checkFeNodesMapValid();
+        } catch (Exception e) {
+            LOG.warn("diff cluster has exception, {}", e.getMessage(), e);
+
+        }
+        LOG.info("daemon cluster get cluster info succ, current 
cloudClusterIdToBackendMap: {}",
+                Env.getCurrentSystemInfo().getCloudClusterIdToBackend());
+    }
+
+    private void updateCloudMetrics() {
+        // Metric
+        Map<String, List<Backend>> clusterIdToBackend = 
Env.getCurrentSystemInfo().getCloudClusterIdToBackend();
+        Map<String, String> clusterNameToId = 
Env.getCurrentSystemInfo().getCloudClusterNameToId();
+        for (Map.Entry<String, String> entry : clusterNameToId.entrySet()) {
+            long aliveNum = 0L;
+            List<Backend> bes = clusterIdToBackend.get(entry.getValue());
+            if (bes == null || bes.size() == 0) {
+                LOG.info("cant get be nodes by cluster {}, bes {}", entry, 
bes);
+                continue;
+            }
+            for (Backend backend : bes) {
+                
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE.computeIfAbsent(backend.getAddress(), 
key -> {
+                    GaugeMetricImpl<Integer> backendAlive = new 
GaugeMetricImpl<>("backend_alive", MetricUnit.NOUNIT,
+                            "backend alive or not");
+                    backendAlive.addLabel(new MetricLabel("cluster_id", 
entry.getValue()));
+                    backendAlive.addLabel(new MetricLabel("cluster_name", 
entry.getKey()));
+                    backendAlive.addLabel(new MetricLabel("address", key));
+                    MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAlive);
+                    return backendAlive;
+                }).setValue(backend.isAlive() ? 1 : 0);
+                aliveNum = backend.isAlive() ? aliveNum + 1 : aliveNum;
+            }
+
+            
MetricRepo.CLOUD_CLUSTER_BACKEND_ALIVE_TOTAL.computeIfAbsent(entry.getKey(), 
key -> {
+                GaugeMetricImpl<Long> backendAliveTotal = new 
GaugeMetricImpl<>("backend_alive_total",
+                        MetricUnit.NOUNIT, "backend alive num in cluster");
+                backendAliveTotal.addLabel(new MetricLabel("cluster_id", 
entry.getValue()));
+                backendAliveTotal.addLabel(new MetricLabel("cluster_name", 
key));
+                MetricRepo.DORIS_METRIC_REGISTER.addMetrics(backendAliveTotal);
+                return backendAliveTotal;
+            }).setValue(aliveNum);
+        }
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to