This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 9305725f5ed [branch-3.0](pick) pick #47501 #47535 (#47602) 9305725f5ed is described below commit 9305725f5ed7e983182eca5f6e4078406efc8266 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Sat Feb 8 15:15:24 2025 +0800 [branch-3.0](pick) pick #47501 #47535 (#47602) --- be/src/common/config.cpp | 3 + be/src/common/config.h | 3 + be/src/olap/parallel_scanner_builder.cpp | 11 +- be/src/olap/parallel_scanner_builder.h | 5 +- be/src/pipeline/exec/olap_scan_operator.cpp | 119 ++++++++++++++------- be/src/pipeline/exec/olap_scan_operator.h | 5 + be/src/pipeline/exec/operator.h | 2 + be/src/pipeline/local_exchange/local_exchanger.cpp | 4 +- be/src/pipeline/pipeline_task.cpp | 3 + 9 files changed, 109 insertions(+), 46 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 73d801f0828..089766214cf 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1031,6 +1031,9 @@ DEFINE_mInt32(segcompaction_num_threads, "5"); // enable java udf and jdbc scannode DEFINE_Bool(enable_java_support, "true"); +// enable prefetch tablets before opening +DEFINE_mBool(enable_prefetch_tablet, "true"); + // Set config randomly to check more issues in github workflow DEFINE_Bool(enable_fuzzy_mode, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index a750c48e921..e3900c0dd86 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1076,6 +1076,9 @@ DECLARE_mInt32(segcompaction_num_threads); // enable java udf and jdbc scannode DECLARE_Bool(enable_java_support); +// enable prefetch tablets before opening +DECLARE_mBool(enable_prefetch_tablet); + // Set config randomly to check more issues in github workflow DECLARE_Bool(enable_fuzzy_mode); diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index 88c69ab5c9a..103e6341d7c 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -164,17 +164,15 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>& */ Status ParallelScannerBuilder::_load() { _total_rows = 0; + size_t idx = 0; for (auto&& [tablet, version] : _tablets) { const auto tablet_id = tablet->tablet_id(); - auto& read_source = _all_read_sources[tablet_id]; - RETURN_IF_ERROR(tablet->capture_rs_readers({0, version}, &read_source.rs_splits, false)); - if (!_state->skip_delete_predicate()) { - read_source.fill_delete_predicates(); - } + _all_read_sources[tablet_id] = _read_sources[idx]; + const auto& read_source = _all_read_sources[tablet_id]; + bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache ? _state->query_options().enable_segment_cache : true; - for (auto& rs_split : read_source.rs_splits) { auto rowset = rs_split.rs_reader->rowset(); RETURN_IF_ERROR(rowset->load()); @@ -190,6 +188,7 @@ Status ParallelScannerBuilder::_load() { } _total_rows += rowset->num_rows(); } + idx++; } _rows_per_scanner = _total_rows / _max_scanners_count; diff --git a/be/src/olap/parallel_scanner_builder.h b/be/src/olap/parallel_scanner_builder.h index 7c6b5648e89..1f371e3129a 100644 --- a/be/src/olap/parallel_scanner_builder.h +++ b/be/src/olap/parallel_scanner_builder.h @@ -44,6 +44,7 @@ class ParallelScannerBuilder { public: ParallelScannerBuilder(pipeline::OlapScanLocalState* parent, const std::vector<TabletWithVersion>& tablets, + std::vector<TabletReader::ReadSource>& read_sources, const std::shared_ptr<RuntimeProfile>& profile, const std::vector<OlapScanRange*>& key_ranges, RuntimeState* state, int64_t limit, bool is_dup_mow_key, bool is_preaggregation) @@ -54,7 +55,8 @@ public: _is_dup_mow_key(is_dup_mow_key), _is_preaggregation(is_preaggregation), _tablets(tablets.cbegin(), tablets.cend()), - _key_ranges(key_ranges.cbegin(), key_ranges.cend()) {} + _key_ranges(key_ranges.cbegin(), key_ranges.cend()), + _read_sources(read_sources) {} Status build_scanners(std::list<VScannerSPtr>& scanners); @@ -93,6 +95,7 @@ private: std::vector<TabletWithVersion> _tablets; std::vector<OlapScanRange*> _key_ranges; std::unordered_map<int64_t, TabletReader::ReadSource> _all_read_sources; + std::vector<TabletReader::ReadSource>& _read_sources; }; } // namespace doris diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index bc92acc134a..dca47227914 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -22,7 +22,9 @@ #include <memory> #include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet.h" +#include "cloud/cloud_tablet_hotspot.h" #include "cloud/config.h" #include "olap/parallel_scanner_builder.h" #include "olap/storage_engine.h" @@ -312,33 +314,7 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s bool has_cpu_limit = state()->query_options().__isset.resource_limit && state()->query_options().resource_limit.__isset.cpu_limit; - std::vector<TabletWithVersion> tablets; - tablets.reserve(_scan_ranges.size()); - for (auto&& scan_range : _scan_ranges) { - // TODO(plat1ko): Get cloud tablet in parallel - auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); - int64_t version = 0; - std::from_chars(scan_range->version.data(), - scan_range->version.data() + scan_range->version.size(), version); - tablets.emplace_back(std::move(tablet), version); - } - - if (config::is_cloud_mode()) { - int64_t duration_ns = 0; - { - SCOPED_RAW_TIMER(&duration_ns); - std::vector<std::function<Status()>> tasks; - tasks.reserve(_scan_ranges.size()); - for (auto&& [tablet, version] : tablets) { - tasks.emplace_back([tablet, version]() { - return std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version); - }); - } - RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10)); - } - _sync_rowset_timer->update(duration_ns); - } - + RETURN_IF_ERROR(hold_tablets()); if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit && p._push_down_agg_type == TPushAggOp::NONE && (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) { @@ -351,8 +327,9 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s key_ranges.emplace_back(range.get()); } - ParallelScannerBuilder scanner_builder(this, tablets, _scanner_profile, key_ranges, state(), - p._limit, true, p._olap_scan_node.is_preaggregation); + ParallelScannerBuilder scanner_builder(this, _tablets, _read_sources, _scanner_profile, + key_ranges, state(), p._limit, true, + p._olap_scan_node.is_preaggregation); int max_scanners_count = state()->parallel_scan_max_scanners_count(); @@ -377,18 +354,19 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); - - for (auto& scan_range : _scan_ranges) { - auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); + for (size_t scan_range_idx = 0; scan_range_idx < _scan_ranges.size(); scan_range_idx++) { int64_t version = 0; - std::from_chars(scan_range->version.data(), - scan_range->version.data() + scan_range->version.size(), version); + std::from_chars(_scan_ranges[scan_range_idx]->version.data(), + _scan_ranges[scan_range_idx]->version.data() + + _scan_ranges[scan_range_idx]->version.size(), + version); std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges; int size_based_scanners_per_tablet = 1; if (config::doris_scan_range_max_mb > 0) { - size_based_scanners_per_tablet = std::max( - 1, (int)(tablet->tablet_footprint() / (config::doris_scan_range_max_mb << 20))); + size_based_scanners_per_tablet = + std::max(1, (int)(_tablets[scan_range_idx].tablet->tablet_footprint() / + (config::doris_scan_range_max_mb << 20))); } int ranges_per_scanner = std::max(1, (int)ranges->size() / @@ -410,9 +388,9 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s state(), _scanner_profile.get(), scanner_ranges, - tablet, + _tablets[scan_range_idx].tablet, version, - {}, + _read_sources[scan_range_idx], p._limit, p._olap_scan_node.is_preaggregation, }); @@ -420,10 +398,70 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s scanners->push_back(std::move(scanner)); } } + _tablets.clear(); + _read_sources.clear(); return Status::OK(); } +Status OlapScanLocalState::hold_tablets() { + if (!_tablets.empty()) { + return Status::OK(); + } + MonotonicStopWatch timer; + timer.start(); + _tablets.resize(_scan_ranges.size()); + _read_sources.resize(_scan_ranges.size()); + for (size_t i = 0; i < _scan_ranges.size(); i++) { + int64_t version = 0; + std::from_chars(_scan_ranges[i]->version.data(), + _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), version); + auto tablet = DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id)); + _tablets[i] = {std::move(tablet), version}; + + if (config::is_cloud_mode()) { + // FIXME(plat1ko): Avoid pointer cast + ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count( + *_tablets[i].tablet); + } + } + + if (config::is_cloud_mode()) { + int64_t duration_ns = 0; + { + SCOPED_RAW_TIMER(&duration_ns); + std::vector<std::function<Status()>> tasks; + tasks.reserve(_scan_ranges.size()); + for (auto&& [cur_tablet, cur_version] : _tablets) { + tasks.emplace_back([cur_tablet, cur_version]() { + return std::dynamic_pointer_cast<CloudTablet>(cur_tablet) + ->sync_rowsets(cur_version); + }); + } + RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10)); + } + _sync_rowset_timer->update(duration_ns); + } + for (size_t i = 0; i < _scan_ranges.size(); i++) { + RETURN_IF_ERROR(_tablets[i].tablet->capture_rs_readers( + {0, _tablets[i].version}, &_read_sources[i].rs_splits, + RuntimeFilterConsumer::_state->skip_missing_version())); + if (!PipelineXLocalState<>::_state->skip_delete_predicate()) { + _read_sources[i].fill_delete_predicates(); + } + } + timer.stop(); + double cost_secs = static_cast<double>(timer.elapsed_time()) / NANOS_PER_SEC; + if (cost_secs > 5) { + LOG_WARNING( + "Try to hold tablets costs {} seconds, it costs too much. (Query-ID={}, NodeId={}, " + "ScanRangeNum={})", + cost_secs, print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(), + _scan_ranges.size()); + } + return Status::OK(); +} + TOlapScanNode& OlapScanLocalState::olap_scan_node() const { return _parent->cast<OlapScanOperatorX>()._olap_scan_node; } @@ -633,4 +671,9 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, i } } +Status OlapScanOperatorX::hold_tablets(RuntimeState* state) { + auto& local_state = ScanOperatorX<OlapScanLocalState>::get_local_state(state); + return local_state.hold_tablets(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 8559bfec2a8..d15a61b7d0f 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -22,6 +22,7 @@ #include <string> #include "common/status.h" +#include "olap/tablet_reader.h" #include "operator.h" #include "pipeline/exec/scan_operator.h" @@ -49,6 +50,7 @@ public: std::to_string(_parent->node_id()), std::to_string(_parent->nereids_id()), olap_scan_node().table_name); } + Status hold_tablets(); private: friend class vectorized::NewOlapScanner; @@ -211,6 +213,8 @@ private: RuntimeProfile::Counter* _segment_load_index_timer = nullptr; std::mutex _profile_mtx; + std::vector<TabletWithVersion> _tablets; + std::vector<TabletReader::ReadSource> _read_sources; }; class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> { @@ -218,6 +222,7 @@ public: OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs, int parallel_tasks, const TQueryCacheParam& cache_param); + Status hold_tablets(RuntimeState* state) override; private: friend class OlapScanLocalState; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index fcd28f96d6e..7abde975fd0 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -647,6 +647,8 @@ public: [[nodiscard]] std::string get_name() const override { return _op_name; } [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; } + // Tablets should be hold before open phase. + [[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return Status::OK(); } Status open(RuntimeState* state) override; [[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block, diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 23f91cca631..3ec4f537e47 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -150,9 +150,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block const auto* offset_start = partitioned_block.second.row_idxs->data() + partitioned_block.second.offset_start; auto block_wrapper = partitioned_block.first; + Defer defer {[&]() { + block_wrapper->unref(local_state._shared_state, local_state._channel_id); + }}; RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, offset_start, offset_start + partitioned_block.second.length)); - block_wrapper->unref(local_state._shared_state, local_state._channel_id); } while (mutable_block.rows() < state->batch_size() && !*eos && _dequeue_data(local_state, partitioned_block, eos, block)); return Status::OK(); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 9d83c475778..5b5698936d7 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -311,6 +311,9 @@ Status PipelineTask::execute(bool* eos) { query_context()->update_cpu_time(delta_cpu_time); }}; if (_wait_to_start()) { + if (config::enable_prefetch_tablet) { + RETURN_IF_ERROR(_source->hold_tablets(_state)); + } return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org