This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c7b96a411f9 branch-4.0: [fix](cloud)Fix read from peer use thread pool 
not asyncio #57587 (#59000)
c7b96a411f9 is described below

commit c7b96a411f99995de11a3170f373e2330472b320
Author: deardeng <[email protected]>
AuthorDate: Mon Dec 22 14:37:33 2025 +0800

    branch-4.0: [fix](cloud)Fix read from peer use thread pool not asyncio 
#57587 (#59000)
    
    cherry pick from #57587
---
 be/src/cloud/config.cpp                            |   2 +-
 be/src/io/cache/block_file_cache_downloader.cpp    |   8 ++
 .../main/java/org/apache/doris/common/Config.java  |  14 +--
 .../doris/cloud/catalog/CloudTabletRebalancer.java |   4 +-
 .../cloud_p0/balance/test_balance_metrics.groovy   | 110 +++++++++++++++++++++
 .../balance/test_peer_read_async_warmup.groovy     |   1 +
 .../test_drop_cluster_clean_metrics.groovy         | 108 ++++++++++++++++++++
 .../test_fe_tablet_same_backend.groovy             |   2 +-
 .../cloud_p0/multi_cluster/test_rebalance.groovy   |   6 +-
 9 files changed, 241 insertions(+), 14 deletions(-)

diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index ab31b9868f5..b915c1e0034 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -135,7 +135,7 @@ DEFINE_mBool(enable_standby_passive_compaction, "true");
 
 DEFINE_mDouble(standby_compaction_version_ratio, "0.8");
 
-DEFINE_mBool(enable_cache_read_from_peer, "false");
+DEFINE_mBool(enable_cache_read_from_peer, "true");
 
 // Cache the expiration time of the peer address.
 // This can be configured to be less than the 
`rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration.
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index ba28d9f3479..567eab3ad98 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -319,6 +319,14 @@ void FileCacheBlockDownloader::download_segment_file(const 
DownloadFileMeta& met
 
     std::unique_ptr<char[]> buffer(new char[one_single_task_size]);
 
+    DBUG_EXECUTE_IF("FileCacheBlockDownloader::download_segment_file_sleep", {
+        auto sleep_time = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                "FileCacheBlockDownloader::download_segment_file_sleep", 
"sleep_time", 3);
+        LOG(INFO) << "FileCacheBlockDownloader::download_segment_file_sleep: 
sleep_time="
+                  << sleep_time;
+        sleep(sleep_time);
+    });
+
     size_t task_offset = 0;
     for (size_t i = 0; i < task_num; i++) {
         size_t offset = meta.offset + task_offset;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0ca1f271a7a..eb33df5003c 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3375,13 +3375,13 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int cloud_min_balance_tablet_num_per_run = 2;
 
-    @ConfField(description = {"指定存算分离模式下所有 Compute group 的扩缩容预热方式。"
-            + "without_warmup: 直接修改 tablet 分片映射,首次读从 S3 拉取,均衡最快但性能波动最大;"
-            + "async_warmup: 异步预热,尽力而为拉取 cache,均衡较快但可能 cache miss;"
-            + "sync_warmup: 同步预热,确保 cache 迁移完成,均衡较慢但无 cache miss;"
-            + "peer_read_async_warmup: 直接修改 tablet 分片映射,首次读从 Peer BE 
拉取,均衡最快可能会影响同计算组中其他 BE 性能。"
-            + "注意:此为全局 FE 配置,也可通过 SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
-            + "设置 compute group 维度的 balance 类型,compute group 维度配置优先级更高",
+    @ConfField(mutable = true, masterOnly = true, description = 
{"指定存算分离模式下所有Compute group的扩缩容预热方式。"
+            + "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
+            + "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
+            + "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
+            + "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer 
BE拉取,均衡最快可能会影响同计算组中其他BE性能。"
+            + "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
+            + "设置compute group维度的balance类型,compute group维度配置优先级更高",
         "Specify the scaling and warming methods for all Compute groups in a 
cloud mode. "
             + "without_warmup: Directly modify shard mapping, first read from 
S3,"
             + "fastest re-balance but largest fluctuation; "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 654eb91cf78..019fb33aeff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -988,7 +988,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
                 sendPreHeatingRpc(task.pickedTablet, task.srcBe, task.destBe);
             } catch (Exception e) {
                 LOG.warn("Failed to preheat tablet {} from {} to {}, "
-                                + "help msg turn off fe config 
enable_cloud_warm_up_for_rebalance",
+                                + "help msg change fe config 
cloud_warm_up_for_rebalance_type to without_warmup, ",
                         task.pickedTablet.getId(), task.srcBe, task.destBe, e);
             }
         }
@@ -1286,7 +1286,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
             sendPreHeatingRpc(pickedTablet, srcBe, destBe);
         } catch (Exception e) {
             LOG.warn("Failed to preheat tablet {} from {} to {}, "
-                    + "help msg turn off fe config 
enable_cloud_warm_up_for_rebalance",
+                    + "help msg change fe config 
cloud_warm_up_for_rebalance_type to without_warmup ",
                     pickedTablet.getId(), srcBe, destBe, e);
             return;
         }
diff --git 
a/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy 
b/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
new file mode 100644
index 00000000000..eb81ad69524
--- /dev/null
+++ b/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_metrics', 'docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=2',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1',
+        'rehash_tablet_after_be_dead_seconds=3600',
+        'cloud_warm_up_for_rebalance_type=without_warmup'
+    ]
+    options.beConfigs += [
+        'report_tablet_interval_seconds=1',
+        'schedule_sync_tablets_interval_s=18000',
+        'disable_auto_compaction=true',
+        'sys_log_verbose_modules=*'
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+    options.enableDebugPoints()
+
+    def getFEMetrics = {ip, port, name ->
+        def url = "http://${ip}:${port}/metrics";
+        logger.info("getFEMetrics1, url: ${url}, name: ${name}")
+        def metrics = new URL(url).text
+        def pattern = 
java.util.regex.Pattern.compile(java.util.regex.Pattern.quote(name) + 
"\\s+(\\d+)")
+        def matcher = pattern.matcher(metrics)
+        if (matcher.find()) {
+            def ret = matcher[0][1] as long
+            logger.info("getFEMetrics2, ${url}, name:${name}, value:${ret}")
+            return ret
+        } else {
+            throw new RuntimeException("${name} not found for ${ip}:${port}")
+        }
+    }
+    
+    def testCase = { table -> 
+        def master = cluster.getMasterFe()
+        def allEditlogNum = 0;
+        def future = thread {
+            awaitUntil(300) {
+                def name = 
"""doris_fe_cloud_partition_balance_num{cluster_id="compute_cluster_id", 
cluster_name="compute_cluster"}"""
+                def value = getFEMetrics(master.host, master.httpPort, name)
+                allEditlogNum += value
+                logger.info("balance metrics value: ${value}, allEditlogNum: 
${allEditlogNum}")
+                return value == 0 && allEditlogNum > 0
+            }
+        }
+        sql """CREATE TABLE $table (
+            `k1` int(11) NULL,
+            `v1` VARCHAR(2048)
+            )
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 200
+            PROPERTIES (
+            "replication_num"="1"
+            );
+        """
+        // generate some balance tasks
+        cluster.addBackend(1)
+        future.get()
+        // wait for rebalancer to do its job
+        assertTrue(allEditlogNum > 0, "balance metrics not increased")
+
+        allEditlogNum = 0
+        for (i in 0..30) {
+            sleep(1000)
+            def name = 
"""doris_fe_cloud_partition_balance_num{cluster_id="compute_cluster_id", 
cluster_name="compute_cluster"}"""
+            def value = getFEMetrics(master.host, master.httpPort, name)
+            allEditlogNum += value
+            logger.info("Final balance metrics value: ${value}, allEditlogNum: 
${allEditlogNum}")
+        }
+        // after all balance tasks done, the metric should not increase
+        assertTrue(allEditlogNum == 0, "final balance metrics not increased")
+
+        cluster.addBackend(1, "other_cluster")
+        sleep(5000)
+        def name = 
"""doris_fe_cloud_partition_balance_num{cluster_id="other_cluster_id", 
cluster_name="other_cluster"}"""
+        def value = getFEMetrics(master.host, master.httpPort, name)
+        logger.info("other cluster balance metrics value: ${value}")
+    }
+
+    docker(options) {
+        testCase("test_balance_metrics_tbl")
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy 
b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
index 38aa4367878..ba9a61409bb 100644
--- a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
@@ -38,6 +38,7 @@ suite('test_peer_read_async_warmup', 'docker') {
         'schedule_sync_tablets_interval_s=18000',
         'disable_auto_compaction=true',
         'sys_log_verbose_modules=*',
+        'enable_cache_read_from_peer=true',
     ]
     options.setFeNum(1)
     options.setBeNum(1)
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
 
b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
new file mode 100644
index 00000000000..d5f2e24c50c
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonOutput
+
+suite('test_drop_cluster_clean_metrics', 'docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=2',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1',
+        'rehash_tablet_after_be_dead_seconds=3600',
+        'cloud_warm_up_for_rebalance_type=without_warmup'
+    ]
+    options.beConfigs += [
+        'report_tablet_interval_seconds=1',
+        'schedule_sync_tablets_interval_s=18000',
+        'disable_auto_compaction=true',
+        'sys_log_verbose_modules=*'
+    ]
+    options.setFeNum(2)
+    options.setBeNum(2)
+    options.cloudMode = true
+    options.enableDebugPoints()
+
+    def drop_cluster_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/drop_cluster?token=$token"
+            body request_body
+            check check_func
+        }
+    }
+
+    def getFEMetrics = {ip, port, name ->
+        def url = "http://${ip}:${port}/metrics";
+        logger.info("getFEMetrics1, url: ${url}, name: ${name}")
+        def metrics = new URL(url).text
+
+        def metricLinePattern = 
java.util.regex.Pattern.compile(java.util.regex.Pattern.quote(name))
+
+        def matcher = metricLinePattern.matcher(metrics)
+        boolean found = false
+        while (matcher.find()) {
+            found = true
+            logger.info("getFEMetrics MATCH FOUND: ${matcher.group(0)}")
+        }
+
+        if (found) {
+            return true
+        } else {
+            def snippet = metrics.length() > 2000 ? metrics.substring(0, 2000) 
+ "..." : metrics
+            logger.info("getFEMetrics NO MATCH for name=${name}, metrics 
snippet:\n${snippet}")
+            return false
+        }
+    }
+
+    def testCase = { -> 
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
+        def fe = cluster.getOneFollowerFe();
+        sleep(3000) // wait for metrics ready
+        def metrics1 = """cluster_id="compute_cluster_id","""
+        assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics1))
+
+        // drop compute cluster
+        def beClusterMap = [cluster_id:"compute_cluster_id"]
+        def instance = [instance_id: "default_instance_id", cluster: 
beClusterMap]
+        def jsonOutput = new JsonOutput()
+        def dropFeClusterBody = jsonOutput.toJson(instance)
+        drop_cluster_api.call(msHttpPort, dropFeClusterBody) {
+            respCode, body ->
+                log.info("drop fe cluster http cli result: ${body} 
${respCode}".toString())
+                def json = parseJson(body)
+        }
+        sleep(3000) // wait for metrics cleaned
+        assertFalse(getFEMetrics(fe.host, fe.httpPort, metrics1))
+
+        cluster.addBackend(2, "new_cluster") 
+
+        sleep(3000) // wait for metrics cleaned
+        def metrics2 = """cluster_id="new_cluster_id","""
+        assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics2))
+    }
+
+    docker(options) {
+        testCase()
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
 
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
index 5ecc610bf5a..9f24d1b2dbf 100644
--- 
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
+++ 
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
@@ -107,7 +107,7 @@ suite('test_fe_tablet_same_backend', 
'multi_cluster,docker') {
     def options = new ClusterOptions()
     options.feConfigs += [
         'cloud_cluster_check_interval_second=1',
-        'enable_cloud_warm_up_for_rebalance=true',
+        'cloud_warm_up_for_rebalance_type=async_warmup',
         'cloud_tablet_rebalancer_interval_second=1',
         'cloud_balance_tablet_percent_per_run=1.0',
     ]
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy 
b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
index 83a8dc336de..81f2227ce44 100644
--- a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
@@ -40,8 +40,8 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
             'sys_log_verbose_modules=org',
         ]
     }
-    clusterOptions[0].feConfigs += ['enable_cloud_warm_up_for_rebalance=true', 
            'cloud_pre_heating_time_limit_sec=300']
-    clusterOptions[1].feConfigs += ['enable_cloud_warm_up_for_rebalance=false']
+    clusterOptions[0].feConfigs += 
['cloud_warm_up_for_rebalance_type=sync_warmup','cloud_pre_heating_time_limit_sec=300']
+    clusterOptions[1].feConfigs += 
['cloud_warm_up_for_rebalance_type=without_warmup']
 
 
     for (int i = 0; i < clusterOptions.size(); i++) {
@@ -178,7 +178,7 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
             // add a be
             cluster.addBackend(1, null)
             // warm up
-            sql """admin set frontend 
config("enable_cloud_warm_up_for_rebalance"="true")"""
+            sql """admin set frontend 
config("cloud_warm_up_for_rebalance_type"="sync_warmup")"""
 
             // test rebalance thread still work
             awaitUntil(30) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to