This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f3cba4afbb4 [Opt](cloud) Refactor `OlapScanLocalState::hold_tablets` 
(#49945)
f3cba4afbb4 is described below

commit f3cba4afbb4093816c2177cbe458af59ce4930f2
Author: bobhan1 <bao...@selectdb.com>
AuthorDate: Wed Apr 23 14:26:05 2025 +0800

    [Opt](cloud) Refactor `OlapScanLocalState::hold_tablets` (#49945)
    
    ### What problem does this PR solve?
    
    In cloud mode, `ExecEnv::get_tablet` may call `sync_rowsets` which is
    heavy and time consuming. This PR make them execute concurrently in
    `OlapScanLocalState::hold_tablets` to reduce time.
---
 be/src/pipeline/exec/olap_scan_operator.cpp | 96 +++++++++++++----------------
 1 file changed, 44 insertions(+), 52 deletions(-)

diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index f087306c9d1..f47216a271e 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -441,76 +441,68 @@ Status OlapScanLocalState::hold_tablets() {
         return Status::OK();
     }
 
-    auto update_sync_rowset_profile = [&](const SyncRowsetStats& sync_stat) {
-        COUNTER_UPDATE(_sync_rowset_get_remote_rowsets_num, 
sync_stat.get_remote_rowsets_num);
-        COUNTER_UPDATE(_sync_rowset_get_remote_rowsets_rpc_timer,
-                       sync_stat.get_remote_rowsets_rpc_ms);
-        COUNTER_UPDATE(_sync_rowset_get_local_delete_bitmap_rowsets_num,
-                       sync_stat.get_local_delete_bitmap_rowsets_num);
-        COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rowsets_num,
-                       sync_stat.get_remote_delete_bitmap_rowsets_num);
-        COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_key_count,
-                       sync_stat.get_remote_delete_bitmap_key_count);
-        COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_bytes,
-                       sync_stat.get_remote_delete_bitmap_bytes);
-        COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rpc_timer,
-                       sync_stat.get_remote_delete_bitmap_rpc_ms);
-    };
-
     MonotonicStopWatch timer;
     timer.start();
     _tablets.resize(_scan_ranges.size());
     _read_sources.resize(_scan_ranges.size());
-    for (size_t i = 0; i < _scan_ranges.size(); i++) {
-        int64_t version = 0;
-        std::from_chars(_scan_ranges[i]->version.data(),
-                        _scan_ranges[i]->version.data() + 
_scan_ranges[i]->version.size(), version);
-        if (config::is_cloud_mode()) {
-            int64_t duration_ns = 0;
-            SyncRowsetStats sync_stats;
-            {
-                SCOPED_RAW_TIMER(&duration_ns);
-                auto tablet =
-                        
DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, &sync_stats));
-                _tablets[i] = {std::move(tablet), version};
-            }
-            COUNTER_UPDATE(_sync_rowset_timer, duration_ns);
-            update_sync_rowset_profile(sync_stats);
-
-            // FIXME(plat1ko): Avoid pointer cast
-            
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(
-                    *_tablets[i].tablet);
-        } else {
-            auto tablet = 
DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id));
-            _tablets[i] = {std::move(tablet), version};
-        }
-    }
 
     if (config::is_cloud_mode()) {
-        int64_t duration_ns = 0;
-        std::vector<SyncRowsetStats> sync_statistics {};
-        sync_statistics.reserve(_tablets.size());
+        std::vector<SyncRowsetStats> sync_statistics(_scan_ranges.size());
+        std::vector<std::function<Status()>> tasks {};
+        tasks.reserve(_scan_ranges.size());
+        int64_t duration_ns {0};
         {
             SCOPED_RAW_TIMER(&duration_ns);
-            std::vector<std::function<Status()>> tasks;
-            tasks.reserve(_scan_ranges.size());
-            for (auto&& [cur_tablet, cur_version] : _tablets) {
-                sync_statistics.emplace_back();
-                tasks.emplace_back([cur_tablet, cur_version, stats = 
&sync_statistics.back()]() {
+            for (size_t i = 0; i < _scan_ranges.size(); i++) {
+                auto* sync_stats = &sync_statistics[i];
+                int64_t version = 0;
+                std::from_chars(_scan_ranges[i]->version.data(),
+                                _scan_ranges[i]->version.data() + 
_scan_ranges[i]->version.size(),
+                                version);
+                tasks.emplace_back([this, sync_stats, version, i]() {
+                    auto tablet =
+                            
DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats));
+                    _tablets[i] = {std::move(tablet), version};
                     SyncOptions options;
-                    options.query_version = cur_version;
+                    options.query_version = version;
                     options.merge_schema = true;
-                    return std::dynamic_pointer_cast<CloudTablet>(cur_tablet)
-                            ->sync_rowsets(options, stats);
+                    
RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablets[i].tablet)
+                                            ->sync_rowsets(options, 
sync_stats));
+                    // FIXME(plat1ko): Avoid pointer cast
+                    
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(
+                            *_tablets[i].tablet);
+                    return Status::OK();
                 });
             }
             RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
         }
         COUNTER_UPDATE(_sync_rowset_timer, duration_ns);
         for (const auto& sync_stats : sync_statistics) {
-            update_sync_rowset_profile(sync_stats);
+            COUNTER_UPDATE(_sync_rowset_get_remote_rowsets_num, 
sync_stats.get_remote_rowsets_num);
+            COUNTER_UPDATE(_sync_rowset_get_remote_rowsets_rpc_timer,
+                           sync_stats.get_remote_rowsets_rpc_ms);
+            COUNTER_UPDATE(_sync_rowset_get_local_delete_bitmap_rowsets_num,
+                           sync_stats.get_local_delete_bitmap_rowsets_num);
+            COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rowsets_num,
+                           sync_stats.get_remote_delete_bitmap_rowsets_num);
+            COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_key_count,
+                           sync_stats.get_remote_delete_bitmap_key_count);
+            COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_bytes,
+                           sync_stats.get_remote_delete_bitmap_bytes);
+            COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rpc_timer,
+                           sync_stats.get_remote_delete_bitmap_rpc_ms);
+        }
+    } else {
+        for (size_t i = 0; i < _scan_ranges.size(); i++) {
+            int64_t version = 0;
+            std::from_chars(_scan_ranges[i]->version.data(),
+                            _scan_ranges[i]->version.data() + 
_scan_ranges[i]->version.size(),
+                            version);
+            auto tablet = 
DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id));
+            _tablets[i] = {std::move(tablet), version};
         }
     }
+
     for (size_t i = 0; i < _scan_ranges.size(); i++) {
         RETURN_IF_ERROR(_tablets[i].tablet->capture_rs_readers({0, 
_tablets[i].version},
                                                                
&_read_sources[i].rs_splits,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to