This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8b4e1845ad5 [fix](cloud) Fix the residual metrics of cluster after
drop compute group (#57235)
8b4e1845ad5 is described below
commit 8b4e1845ad51fa0896838a3b25811b0c8f7efe42
Author: deardeng <[email protected]>
AuthorDate: Wed Oct 29 11:15:02 2025 +0800
[fix](cloud) Fix the residual metrics of cluster after drop compute group
(#57235)
---
.../doris/cloud/catalog/CloudClusterChecker.java | 2 +-
.../cloud/catalog/CloudInstanceStatusChecker.java | 2 +
.../doris/cloud/system/CloudSystemInfoService.java | 1 +
.../org/apache/doris/metric/AutoMappedMetric.java | 3 +
.../java/org/apache/doris/metric/MetricRepo.java | 103 ++++++++++++++++++++
.../test_drop_cluster_clean_metrics.groovy | 108 +++++++++++++++++++++
6 files changed, 218 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index 4ba162dfe11..338d619604f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -152,10 +152,10 @@ public class CloudClusterChecker extends MasterDaemon {
// del clusterName
String delClusterName =
cloudSystemInfoService.getClusterNameByClusterId(delId);
if (delClusterName.isEmpty()) {
- LOG.warn("can't get delClusterName, clusterId: {}, plz
check", delId);
return;
}
// del clusterID
+ MetricRepo.unregisterCloudMetrics(delId, delClusterName,
toDel);
cloudSystemInfoService.dropCluster(delId, delClusterName);
}
);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
index 044f24d2242..e6cadf0e0e6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudInstanceStatusChecker.java
@@ -469,6 +469,8 @@ public class CloudInstanceStatusChecker extends
MasterDaemon {
// in fe mem, but not in meta server
if (!msVirtualClusters.contains(computeGroup.getId())) {
LOG.info("virtual compute group {} will be removed.",
computeGroup.getName());
+ MetricRepo.unregisterCloudMetrics(computeGroup.getId(),
computeGroup.getName(),
+ Collections.emptyList());
cloudSystemInfoService.removeComputeGroup(computeGroup.getId(),
computeGroup.getName());
// cancel invalid job
if
(!computeGroup.getPolicy().getCacheWarmupJobIds().isEmpty()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 62ab6e3b9ec..9f30f4a0590 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -406,6 +406,7 @@ public class CloudSystemInfoService extends
SystemInfoService {
// ATTN: Empty clusters are treated as dropped clusters.
if (be.isEmpty()) {
LOG.info("del clusterId {} and clusterName {} due to be nodes
eq 0", clusterId, clusterName);
+ MetricRepo.unregisterCloudMetrics(clusterId, clusterName,
toDel);
boolean succ = clusterNameToId.remove(clusterName, clusterId);
// remove from computeGroupIdToComputeGroup
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java
index 440e00330b4..5b348c73b15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java
@@ -38,4 +38,7 @@ public class AutoMappedMetric<M> {
return nameToMetric;
}
+ public void remove(String name) {
+ nameToMetric.remove(name);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 2b611e8699c..eb5fa69c9f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -1593,4 +1593,107 @@ public final class MetricRepo {
counter.setLabels(labels);
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter);
}
+
+ public static void unregisterCloudMetrics(String clusterId, String
clusterName, List<Backend> backends) {
+ if (!MetricRepo.isInit || Config.isNotCloudMode() ||
Strings.isNullOrEmpty(clusterId)) {
+ return;
+ }
+ LOG.debug("unregister cloud metrics for cluster {}", clusterId);
+ try {
+ List<MetricLabel> labels = new ArrayList<>();
+ labels.add(new MetricLabel("cluster_id", clusterId));
+ labels.add(new MetricLabel("cluster_name", clusterName));
+
+ LongCounterMetric requestAllCounter =
CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.getOrAdd(clusterId);
+ CloudMetrics.CLUSTER_REQUEST_ALL_COUNTER.remove(clusterId);
+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(requestAllCounter.getName(),
labels);
+
+ LongCounterMetric queryAllCounter =
CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.getOrAdd(clusterId);
+ CloudMetrics.CLUSTER_QUERY_ALL_COUNTER.remove(clusterId);
+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryAllCounter.getName(),
labels);
+
+ LongCounterMetric queryErrCounter =
CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.getOrAdd(clusterId);
+ CloudMetrics.CLUSTER_QUERY_ERR_COUNTER.remove(clusterId);
+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryErrCounter.getName(),
labels);
+
+ LongCounterMetric warmUpJobExecCounter =
CloudMetrics.CLUSTER_WARM_UP_JOB_EXEC_COUNT.getOrAdd(clusterId);
+ CloudMetrics.CLUSTER_WARM_UP_JOB_EXEC_COUNT.remove(clusterId);
+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobExecCounter.getName(),
labels);
+
+ LongCounterMetric warmUpJobRequestedTablets =
+
CloudMetrics.CLUSTER_WARM_UP_JOB_REQUESTED_TABLETS.getOrAdd(clusterId);
+
CloudMetrics.CLUSTER_WARM_UP_JOB_REQUESTED_TABLETS.remove(clusterId);
+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobRequestedTablets.getName(),
labels);
+
+ LongCounterMetric warmUpJobFinishedTablets =
+
CloudMetrics.CLUSTER_WARM_UP_JOB_FINISHED_TABLETS.getOrAdd(clusterId);
+
CloudMetrics.CLUSTER_WARM_UP_JOB_FINISHED_TABLETS.remove(clusterId);
+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(warmUpJobFinishedTablets.getName(),
labels);
+
+ GaugeMetricImpl<Double> requestPerSecondGauge =
CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE
+ .getOrAdd(clusterId);
+ CloudMetrics.CLUSTER_REQUEST_PER_SECOND_GAUGE.remove(clusterId);
+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(requestPerSecondGauge.getName(),
labels);
+
+ GaugeMetricImpl<Double> queryPerSecondGauge =
CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE
+ .getOrAdd(clusterId);
+ CloudMetrics.CLUSTER_QUERY_PER_SECOND_GAUGE.remove(clusterId);
+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryPerSecondGauge.getName(),
labels);
+
+ GaugeMetricImpl<Double> queryErrRateGauge =
CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.getOrAdd(clusterId);
+ CloudMetrics.CLUSTER_QUERY_ERR_RATE_GAUGE.remove(clusterId);
+
DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(queryErrRateGauge.getName(),
labels);
+
+ LongCounterMetric clusterCloudPartitionBalanceNum = CloudMetrics
+ .CLUSTER_CLOUD_PARTITION_BALANCE_NUM.getOrAdd(clusterId);
+ CloudMetrics.CLUSTER_CLOUD_PARTITION_BALANCE_NUM.remove(clusterId);
+ MetricRepo.DORIS_METRIC_REGISTER
+
.removeMetricsByNameAndLabels(clusterCloudPartitionBalanceNum.getName(),
labels);
+
+ LongCounterMetric clusterCloudTableBalanceNum = CloudMetrics
+ .CLUSTER_CLOUD_TABLE_BALANCE_NUM.getOrAdd(clusterId);
+ CloudMetrics.CLUSTER_CLOUD_TABLE_BALANCE_NUM.remove(clusterId);
+ MetricRepo.DORIS_METRIC_REGISTER
+
.removeMetricsByNameAndLabels(clusterCloudTableBalanceNum.getName(), labels);
+
+ LongCounterMetric clusterCloudGlobalBalanceNum = CloudMetrics
+ .CLUSTER_CLOUD_GLOBAL_BALANCE_NUM.getOrAdd(clusterId);
+ CloudMetrics.CLUSTER_CLOUD_GLOBAL_BALANCE_NUM.remove(clusterId);
+ MetricRepo.DORIS_METRIC_REGISTER
+
.removeMetricsByNameAndLabels(clusterCloudGlobalBalanceNum.getName(), labels);
+
+ LongCounterMetric clusterCloudUpgradeBalanceNum = CloudMetrics
+
.CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM.getOrAdd(clusterId);
+
CloudMetrics.CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM.remove(clusterId);
+ MetricRepo.DORIS_METRIC_REGISTER
+
.removeMetricsByNameAndLabels(clusterCloudUpgradeBalanceNum.getName(), labels);
+
+ LongCounterMetric clusterCloudWarmUpBalanceNum = CloudMetrics
+
.CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM.getOrAdd(clusterId);
+
CloudMetrics.CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM.remove(clusterId);
+ MetricRepo.DORIS_METRIC_REGISTER
+
.removeMetricsByNameAndLabels(clusterCloudWarmUpBalanceNum.getName(), labels);
+
+ METRIC_REGISTER.getHistograms().keySet().stream()
+ .filter(k -> k.contains(clusterId))
+ .forEach(METRIC_REGISTER::remove);
+
+ for (Backend backend : backends) {
+ List<MetricLabel> backendLabels = new ArrayList<>();
+ backendLabels.add(new MetricLabel("cluster_id", clusterId));
+ backendLabels.add(new MetricLabel("cluster_name",
clusterName));
+ backendLabels.add(new MetricLabel("address",
backend.getAddress()));
+ String key = clusterId + "_" + backend.getAddress();
+ GaugeMetricImpl<Integer> metric =
CloudMetrics.CLUSTER_BACKEND_ALIVE.getOrAdd(key);
+
MetricRepo.DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(metric.getName(),
backendLabels);
+ }
+
+ GaugeMetricImpl<Integer> backendAliveTotal =
CloudMetrics.CLUSTER_BACKEND_ALIVE_TOTAL.getOrAdd(clusterId);
+ CloudMetrics.CLUSTER_BACKEND_ALIVE_TOTAL.remove(clusterId);
+
MetricRepo.DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(backendAliveTotal.getName(),
labels);
+
+ } catch (Throwable t) {
+ LOG.warn("unregister cloud metrics for cluster {} failed",
clusterId, t);
+ }
+ }
}
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
new file mode 100644
index 00000000000..91410e246c1
--- /dev/null
+++
b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonOutput
+
+suite('test_drop_cluster_clean_metrics', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=2',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'enable_cloud_warm_up_for_rebalance=false'
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*'
+ ]
+ options.setFeNum(2)
+ options.setBeNum(2)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def drop_cluster_api = { msHttpPort, request_body, check_func ->
+ httpTest {
+ endpoint msHttpPort
+ uri "/MetaService/http/drop_cluster?token=$token"
+ body request_body
+ check check_func
+ }
+ }
+
+ def getFEMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/metrics"
+ logger.info("getFEMetrics1, url: ${url}, name: ${name}")
+ def metrics = new URL(url).text
+
+ def metricLinePattern =
java.util.regex.Pattern.compile(java.util.regex.Pattern.quote(name))
+
+ def matcher = metricLinePattern.matcher(metrics)
+ boolean found = false
+ while (matcher.find()) {
+ found = true
+ logger.info("getFEMetrics MATCH FOUND: ${matcher.group(0)}")
+ }
+
+ if (found) {
+ return true
+ } else {
+ def snippet = metrics.length() > 2000 ? metrics.substring(0, 2000)
+ "..." : metrics
+ logger.info("getFEMetrics NO MATCH for name=${name}, metrics
snippet:\n${snippet}")
+ return false
+ }
+ }
+
+ def testCase = { ->
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+ def fe = cluster.getOneFollowerFe();
+ sleep(3000) // wait for metrics ready
+ def metrics1 = """cluster_id="compute_cluster_id","""
+ assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics1))
+
+ // drop compute cluster
+ def beClusterMap = [cluster_id:"compute_cluster_id"]
+ def instance = [instance_id: "default_instance_id", cluster:
beClusterMap]
+ def jsonOutput = new JsonOutput()
+ def dropFeClusterBody = jsonOutput.toJson(instance)
+ drop_cluster_api.call(msHttpPort, dropFeClusterBody) {
+ respCode, body ->
+ log.info("drop fe cluster http cli result: ${body}
${respCode}".toString())
+ def json = parseJson(body)
+ }
+ sleep(3000) // wait for metrics cleaned
+ assertFalse(getFEMetrics(fe.host, fe.httpPort, metrics1))
+
+ cluster.addBackend(2, "new_cluster")
+
+ sleep(3000) // wait for metrics cleaned
+ def metrics2 = """cluster_id="new_cluster_id","""
+ assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics2))
+ }
+
+ docker(options) {
+ testCase()
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]