This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9305725f5ed [branch-3.0](pick) pick #47501 #47535 (#47602)
9305725f5ed is described below

commit 9305725f5ed7e983182eca5f6e4078406efc8266
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Sat Feb 8 15:15:24 2025 +0800

    [branch-3.0](pick) pick #47501 #47535 (#47602)
---
 be/src/common/config.cpp                           |   3 +
 be/src/common/config.h                             |   3 +
 be/src/olap/parallel_scanner_builder.cpp           |  11 +-
 be/src/olap/parallel_scanner_builder.h             |   5 +-
 be/src/pipeline/exec/olap_scan_operator.cpp        | 119 ++++++++++++++-------
 be/src/pipeline/exec/olap_scan_operator.h          |   5 +
 be/src/pipeline/exec/operator.h                    |   2 +
 be/src/pipeline/local_exchange/local_exchanger.cpp |   4 +-
 be/src/pipeline/pipeline_task.cpp                  |   3 +
 9 files changed, 109 insertions(+), 46 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 73d801f0828..089766214cf 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1031,6 +1031,9 @@ DEFINE_mInt32(segcompaction_num_threads, "5");
 // enable java udf and jdbc scannode
 DEFINE_Bool(enable_java_support, "true");
 
+// enable prefetch tablets before opening
+DEFINE_mBool(enable_prefetch_tablet, "true");
+
 // Set config randomly to check more issues in github workflow
 DEFINE_Bool(enable_fuzzy_mode, "false");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a750c48e921..e3900c0dd86 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1076,6 +1076,9 @@ DECLARE_mInt32(segcompaction_num_threads);
 // enable java udf and jdbc scannode
 DECLARE_Bool(enable_java_support);
 
+// enable prefetch tablets before opening
+DECLARE_mBool(enable_prefetch_tablet);
+
 // Set config randomly to check more issues in github workflow
 DECLARE_Bool(enable_fuzzy_mode);
 
diff --git a/be/src/olap/parallel_scanner_builder.cpp 
b/be/src/olap/parallel_scanner_builder.cpp
index 88c69ab5c9a..103e6341d7c 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -164,17 +164,15 @@ Status 
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
  */
 Status ParallelScannerBuilder::_load() {
     _total_rows = 0;
+    size_t idx = 0;
     for (auto&& [tablet, version] : _tablets) {
         const auto tablet_id = tablet->tablet_id();
-        auto& read_source = _all_read_sources[tablet_id];
-        RETURN_IF_ERROR(tablet->capture_rs_readers({0, version}, 
&read_source.rs_splits, false));
-        if (!_state->skip_delete_predicate()) {
-            read_source.fill_delete_predicates();
-        }
+        _all_read_sources[tablet_id] = _read_sources[idx];
+        const auto& read_source = _all_read_sources[tablet_id];
+
         bool enable_segment_cache = 
_state->query_options().__isset.enable_segment_cache
                                             ? 
_state->query_options().enable_segment_cache
                                             : true;
-
         for (auto& rs_split : read_source.rs_splits) {
             auto rowset = rs_split.rs_reader->rowset();
             RETURN_IF_ERROR(rowset->load());
@@ -190,6 +188,7 @@ Status ParallelScannerBuilder::_load() {
             }
             _total_rows += rowset->num_rows();
         }
+        idx++;
     }
 
     _rows_per_scanner = _total_rows / _max_scanners_count;
diff --git a/be/src/olap/parallel_scanner_builder.h 
b/be/src/olap/parallel_scanner_builder.h
index 7c6b5648e89..1f371e3129a 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -44,6 +44,7 @@ class ParallelScannerBuilder {
 public:
     ParallelScannerBuilder(pipeline::OlapScanLocalState* parent,
                            const std::vector<TabletWithVersion>& tablets,
+                           std::vector<TabletReader::ReadSource>& read_sources,
                            const std::shared_ptr<RuntimeProfile>& profile,
                            const std::vector<OlapScanRange*>& key_ranges, 
RuntimeState* state,
                            int64_t limit, bool is_dup_mow_key, bool 
is_preaggregation)
@@ -54,7 +55,8 @@ public:
               _is_dup_mow_key(is_dup_mow_key),
               _is_preaggregation(is_preaggregation),
               _tablets(tablets.cbegin(), tablets.cend()),
-              _key_ranges(key_ranges.cbegin(), key_ranges.cend()) {}
+              _key_ranges(key_ranges.cbegin(), key_ranges.cend()),
+              _read_sources(read_sources) {}
 
     Status build_scanners(std::list<VScannerSPtr>& scanners);
 
@@ -93,6 +95,7 @@ private:
     std::vector<TabletWithVersion> _tablets;
     std::vector<OlapScanRange*> _key_ranges;
     std::unordered_map<int64_t, TabletReader::ReadSource> _all_read_sources;
+    std::vector<TabletReader::ReadSource>& _read_sources;
 };
 
 } // namespace doris
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index bc92acc134a..dca47227914 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -22,7 +22,9 @@
 #include <memory>
 
 #include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
 #include "cloud/cloud_tablet.h"
+#include "cloud/cloud_tablet_hotspot.h"
 #include "cloud/config.h"
 #include "olap/parallel_scanner_builder.h"
 #include "olap/storage_engine.h"
@@ -312,33 +314,7 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
                          
state()->query_options().resource_limit.__isset.cpu_limit;
 
-    std::vector<TabletWithVersion> tablets;
-    tablets.reserve(_scan_ranges.size());
-    for (auto&& scan_range : _scan_ranges) {
-        // TODO(plat1ko): Get cloud tablet in parallel
-        auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
-        int64_t version = 0;
-        std::from_chars(scan_range->version.data(),
-                        scan_range->version.data() + 
scan_range->version.size(), version);
-        tablets.emplace_back(std::move(tablet), version);
-    }
-
-    if (config::is_cloud_mode()) {
-        int64_t duration_ns = 0;
-        {
-            SCOPED_RAW_TIMER(&duration_ns);
-            std::vector<std::function<Status()>> tasks;
-            tasks.reserve(_scan_ranges.size());
-            for (auto&& [tablet, version] : tablets) {
-                tasks.emplace_back([tablet, version]() {
-                    return 
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
-                });
-            }
-            RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
-        }
-        _sync_rowset_timer->update(duration_ns);
-    }
-
+    RETURN_IF_ERROR(hold_tablets());
     if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
         p._push_down_agg_type == TPushAggOp::NONE &&
         (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) {
@@ -351,8 +327,9 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
             key_ranges.emplace_back(range.get());
         }
 
-        ParallelScannerBuilder scanner_builder(this, tablets, 
_scanner_profile, key_ranges, state(),
-                                               p._limit, true, 
p._olap_scan_node.is_preaggregation);
+        ParallelScannerBuilder scanner_builder(this, _tablets, _read_sources, 
_scanner_profile,
+                                               key_ranges, state(), p._limit, 
true,
+                                               
p._olap_scan_node.is_preaggregation);
 
         int max_scanners_count = state()->parallel_scan_max_scanners_count();
 
@@ -377,18 +354,19 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     }
 
     int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
-
-    for (auto& scan_range : _scan_ranges) {
-        auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
+    for (size_t scan_range_idx = 0; scan_range_idx < _scan_ranges.size(); 
scan_range_idx++) {
         int64_t version = 0;
-        std::from_chars(scan_range->version.data(),
-                        scan_range->version.data() + 
scan_range->version.size(), version);
+        std::from_chars(_scan_ranges[scan_range_idx]->version.data(),
+                        _scan_ranges[scan_range_idx]->version.data() +
+                                _scan_ranges[scan_range_idx]->version.size(),
+                        version);
         std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = 
&_cond_ranges;
         int size_based_scanners_per_tablet = 1;
 
         if (config::doris_scan_range_max_mb > 0) {
-            size_based_scanners_per_tablet = std::max(
-                    1, (int)(tablet->tablet_footprint() / 
(config::doris_scan_range_max_mb << 20)));
+            size_based_scanners_per_tablet =
+                    std::max(1, 
(int)(_tablets[scan_range_idx].tablet->tablet_footprint() /
+                                      (config::doris_scan_range_max_mb << 
20)));
         }
         int ranges_per_scanner =
                 std::max(1, (int)ranges->size() /
@@ -410,9 +388,9 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
                                   state(),
                                   _scanner_profile.get(),
                                   scanner_ranges,
-                                  tablet,
+                                  _tablets[scan_range_idx].tablet,
                                   version,
-                                  {},
+                                  _read_sources[scan_range_idx],
                                   p._limit,
                                   p._olap_scan_node.is_preaggregation,
                           });
@@ -420,10 +398,70 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
             scanners->push_back(std::move(scanner));
         }
     }
+    _tablets.clear();
+    _read_sources.clear();
 
     return Status::OK();
 }
 
+Status OlapScanLocalState::hold_tablets() {
+    if (!_tablets.empty()) {
+        return Status::OK();
+    }
+    MonotonicStopWatch timer;
+    timer.start();
+    _tablets.resize(_scan_ranges.size());
+    _read_sources.resize(_scan_ranges.size());
+    for (size_t i = 0; i < _scan_ranges.size(); i++) {
+        int64_t version = 0;
+        std::from_chars(_scan_ranges[i]->version.data(),
+                        _scan_ranges[i]->version.data() + 
_scan_ranges[i]->version.size(), version);
+        auto tablet = 
DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id));
+        _tablets[i] = {std::move(tablet), version};
+
+        if (config::is_cloud_mode()) {
+            // FIXME(plat1ko): Avoid pointer cast
+            
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(
+                    *_tablets[i].tablet);
+        }
+    }
+
+    if (config::is_cloud_mode()) {
+        int64_t duration_ns = 0;
+        {
+            SCOPED_RAW_TIMER(&duration_ns);
+            std::vector<std::function<Status()>> tasks;
+            tasks.reserve(_scan_ranges.size());
+            for (auto&& [cur_tablet, cur_version] : _tablets) {
+                tasks.emplace_back([cur_tablet, cur_version]() {
+                    return std::dynamic_pointer_cast<CloudTablet>(cur_tablet)
+                            ->sync_rowsets(cur_version);
+                });
+            }
+            RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
+        }
+        _sync_rowset_timer->update(duration_ns);
+    }
+    for (size_t i = 0; i < _scan_ranges.size(); i++) {
+        RETURN_IF_ERROR(_tablets[i].tablet->capture_rs_readers(
+                {0, _tablets[i].version}, &_read_sources[i].rs_splits,
+                RuntimeFilterConsumer::_state->skip_missing_version()));
+        if (!PipelineXLocalState<>::_state->skip_delete_predicate()) {
+            _read_sources[i].fill_delete_predicates();
+        }
+    }
+    timer.stop();
+    double cost_secs = static_cast<double>(timer.elapsed_time()) / 
NANOS_PER_SEC;
+    if (cost_secs > 5) {
+        LOG_WARNING(
+                "Try to hold tablets costs {} seconds, it costs too much. 
(Query-ID={}, NodeId={}, "
+                "ScanRangeNum={})",
+                cost_secs, 
print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(),
+                _scan_ranges.size());
+    }
+    return Status::OK();
+}
+
 TOlapScanNode& OlapScanLocalState::olap_scan_node() const {
     return _parent->cast<OlapScanOperatorX>()._olap_scan_node;
 }
@@ -633,4 +671,9 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, 
const TPlanNode& tnode, i
     }
 }
 
+Status OlapScanOperatorX::hold_tablets(RuntimeState* state) {
+    auto& local_state = 
ScanOperatorX<OlapScanLocalState>::get_local_state(state);
+    return local_state.hold_tablets();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index 8559bfec2a8..d15a61b7d0f 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -22,6 +22,7 @@
 #include <string>
 
 #include "common/status.h"
+#include "olap/tablet_reader.h"
 #include "operator.h"
 #include "pipeline/exec/scan_operator.h"
 
@@ -49,6 +50,7 @@ public:
                            std::to_string(_parent->node_id()),
                            std::to_string(_parent->nereids_id()), 
olap_scan_node().table_name);
     }
+    Status hold_tablets();
 
 private:
     friend class vectorized::NewOlapScanner;
@@ -211,6 +213,8 @@ private:
     RuntimeProfile::Counter* _segment_load_index_timer = nullptr;
 
     std::mutex _profile_mtx;
+    std::vector<TabletWithVersion> _tablets;
+    std::vector<TabletReader::ReadSource> _read_sources;
 };
 
 class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
@@ -218,6 +222,7 @@ public:
     OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                       const DescriptorTbl& descs, int parallel_tasks,
                       const TQueryCacheParam& cache_param);
+    Status hold_tablets(RuntimeState* state) override;
 
 private:
     friend class OlapScanLocalState;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index fcd28f96d6e..7abde975fd0 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -647,6 +647,8 @@ public:
     [[nodiscard]] std::string get_name() const override { return _op_name; }
     [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const 
{ return true; }
 
+    // Tablets should be hold before open phase.
+    [[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return 
Status::OK(); }
     Status open(RuntimeState* state) override;
 
     [[nodiscard]] virtual Status get_block(RuntimeState* state, 
vectorized::Block* block,
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 23f91cca631..3ec4f537e47 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -150,9 +150,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
             const auto* offset_start = 
partitioned_block.second.row_idxs->data() +
                                        partitioned_block.second.offset_start;
             auto block_wrapper = partitioned_block.first;
+            Defer defer {[&]() {
+                block_wrapper->unref(local_state._shared_state, 
local_state._channel_id);
+            }};
             RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, 
offset_start,
                                                    offset_start + 
partitioned_block.second.length));
-            block_wrapper->unref(local_state._shared_state, 
local_state._channel_id);
         } while (mutable_block.rows() < state->batch_size() && !*eos &&
                  _dequeue_data(local_state, partitioned_block, eos, block));
         return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 9d83c475778..5b5698936d7 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -311,6 +311,9 @@ Status PipelineTask::execute(bool* eos) {
         query_context()->update_cpu_time(delta_cpu_time);
     }};
     if (_wait_to_start()) {
+        if (config::enable_prefetch_tablet) {
+            RETURN_IF_ERROR(_source->hold_tablets(_state));
+        }
         return Status::OK();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to