This is an automated email from the ASF dual-hosted git repository. zhangstar333 pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 695d58f3543 [cherry-pick](scan)scanner could eos early when reached limit (#36535) (#36736) 695d58f3543 is described below commit 695d58f3543ae75731bbbf13d9dfd36687eb9d7c Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Tue Jun 25 17:22:43 2024 +0800 [cherry-pick](scan)scanner could eos early when reached limit (#36535) (#36736) ## Proposed changes cherry-pick from master #36535 --- be/src/olap/parallel_scanner_builder.cpp | 5 ++--- be/src/olap/parallel_scanner_builder.h | 6 +++--- be/src/pipeline/exec/es_scan_operator.cpp | 4 ++-- be/src/pipeline/exec/file_scan_operator.cpp | 3 +-- be/src/pipeline/exec/jdbc_scan_operator.cpp | 2 +- be/src/pipeline/exec/meta_scan_operator.cpp | 3 +-- be/src/pipeline/exec/olap_scan_operator.cpp | 4 ++-- be/src/vec/exec/scan/scanner_context.cpp | 6 +++++- 8 files changed, 17 insertions(+), 16 deletions(-) diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index a7f7d6da001..5ad74232215 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -197,9 +197,8 @@ template <typename ParentType> std::shared_ptr<NewOlapScanner> ParallelScannerBuilder<ParentType>::_build_scanner( BaseTabletSPtr tablet, int64_t version, const std::vector<OlapScanRange*>& key_ranges, TabletReader::ReadSource&& read_source) { - NewOlapScanner::Params params { - _state, _scanner_profile.get(), key_ranges, std::move(tablet), - version, std::move(read_source), _limit_per_scanner, _is_preaggregation}; + NewOlapScanner::Params params {_state, _scanner_profile.get(), key_ranges, std::move(tablet), + version, std::move(read_source), _limit, _is_preaggregation}; return NewOlapScanner::create_shared(_parent, std::move(params)); } diff --git a/be/src/olap/parallel_scanner_builder.h b/be/src/olap/parallel_scanner_builder.h index b9d659abc27..7d28dd706f5 100644 --- a/be/src/olap/parallel_scanner_builder.h +++ b/be/src/olap/parallel_scanner_builder.h @@ -46,11 +46,11 @@ public: ParallelScannerBuilder(ParentType* parent, const std::vector<TabletWithVersion>& tablets, const std::shared_ptr<RuntimeProfile>& profile, const std::vector<OlapScanRange*>& key_ranges, RuntimeState* state, - int64_t limit_per_scanner, bool is_dup_mow_key, bool is_preaggregation) + int64_t limit, bool is_dup_mow_key, bool is_preaggregation) : _parent(parent), _scanner_profile(profile), _state(state), - _limit_per_scanner(limit_per_scanner), + _limit(limit), _is_dup_mow_key(is_dup_mow_key), _is_preaggregation(is_preaggregation), _tablets(tablets.cbegin(), tablets.cend()), @@ -87,7 +87,7 @@ private: std::shared_ptr<RuntimeProfile> _scanner_profile; RuntimeState* _state; - int64_t _limit_per_scanner; + int64_t _limit; bool _is_dup_mow_key; bool _is_preaggregation; std::vector<TabletWithVersion> _tablets; diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index c00ee6917ea..0e5018b85d6 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -95,8 +95,8 @@ Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca properties, p._column_names, p._docvalue_context, &doc_value_mode); std::shared_ptr<vectorized::NewEsScanner> scanner = vectorized::NewEsScanner::create_shared( - vectorized::RuntimeFilterConsumer::_state, this, p._limit_per_scanner, p._tuple_id, - properties, p._docvalue_context, doc_value_mode, + vectorized::RuntimeFilterConsumer::_state, this, p._limit, p._tuple_id, properties, + p._docvalue_context, doc_value_mode, vectorized::RuntimeFilterConsumer::_state->runtime_profile()); RETURN_IF_ERROR( diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 59dbbe8d1a5..9b2eeb9b28b 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -45,8 +45,7 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s _kv_cache.reset(new vectorized::ShardedKVCache(shard_num)); for (int i = 0; i < _max_scanners; ++i) { std::unique_ptr<vectorized::VFileScanner> scanner = vectorized::VFileScanner::create_unique( - state(), this, p._limit_per_scanner, _split_source, _scanner_profile.get(), - _kv_cache.get()); + state(), this, p._limit, _split_source, _scanner_profile.get(), _kv_cache.get()); RETURN_IF_ERROR( scanner->prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id)); scanners->push_back(std::move(scanner)); diff --git a/be/src/pipeline/exec/jdbc_scan_operator.cpp b/be/src/pipeline/exec/jdbc_scan_operator.cpp index f6c22db9283..35ad7ec0490 100644 --- a/be/src/pipeline/exec/jdbc_scan_operator.cpp +++ b/be/src/pipeline/exec/jdbc_scan_operator.cpp @@ -30,7 +30,7 @@ std::string JDBCScanLocalState::name_suffix() const { Status JDBCScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) { auto& p = _parent->cast<JDBCScanOperatorX>(); std::unique_ptr<vectorized::NewJdbcScanner> scanner = vectorized::NewJdbcScanner::create_unique( - state(), this, p._limit_per_scanner, p._tuple_id, p._query_string, p._table_type, + state(), this, p._limit, p._tuple_id, p._query_string, p._table_type, _scanner_profile.get()); RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts)); scanners->push_back(std::move(scanner)); diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp b/be/src/pipeline/exec/meta_scan_operator.cpp index 749fbcf333a..e5edc001bea 100644 --- a/be/src/pipeline/exec/meta_scan_operator.cpp +++ b/be/src/pipeline/exec/meta_scan_operator.cpp @@ -30,8 +30,7 @@ Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s for (auto& scan_range : _scan_ranges) { std::shared_ptr<vectorized::VMetaScanner> scanner = vectorized::VMetaScanner::create_shared( - state(), this, p._tuple_id, scan_range, p._limit_per_scanner, profile(), - p._user_identity); + state(), this, p._tuple_id, scan_range, p._limit, profile(), p._user_identity); RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts)); scanners->push_back(scanner); } diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 00650b8a976..d9e22846377 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -287,7 +287,7 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s } ParallelScannerBuilder<OlapScanLocalState> scanner_builder( - this, tablets, _scanner_profile, key_ranges, state(), p._limit_per_scanner, true, + this, tablets, _scanner_profile, key_ranges, state(), p._limit, true, p._olap_scan_node.is_preaggregation); int max_scanners_count = state()->parallel_scan_max_scanners_count(); @@ -326,7 +326,7 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s std::move(tablet), version, {}, - p._limit_per_scanner, + p._limit, p._olap_scan_node.is_preaggregation, }); RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts)); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index c2de00830fc..632c92f7a81 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -80,7 +80,11 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu : state->query_parallel_instance_num()); _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size()); - + // 1. Calculate max concurrency + // For select * from table limit 10; should just use one thread. + if (_local_state && _local_state->should_run_serial()) { + _max_thread_num = 1; + } // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. // you can refer https://github.com/apache/doris/issues/35340 for details. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org