This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 695d58f3543 [cherry-pick](scan)scanner could eos early when reached 
limit (#36535) (#36736)
695d58f3543 is described below

commit 695d58f3543ae75731bbbf13d9dfd36687eb9d7c
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Tue Jun 25 17:22:43 2024 +0800

    [cherry-pick](scan)scanner could eos early when reached limit (#36535) 
(#36736)
    
    ## Proposed changes
    cherry-pick from master #36535
---
 be/src/olap/parallel_scanner_builder.cpp    | 5 ++---
 be/src/olap/parallel_scanner_builder.h      | 6 +++---
 be/src/pipeline/exec/es_scan_operator.cpp   | 4 ++--
 be/src/pipeline/exec/file_scan_operator.cpp | 3 +--
 be/src/pipeline/exec/jdbc_scan_operator.cpp | 2 +-
 be/src/pipeline/exec/meta_scan_operator.cpp | 3 +--
 be/src/pipeline/exec/olap_scan_operator.cpp | 4 ++--
 be/src/vec/exec/scan/scanner_context.cpp    | 6 +++++-
 8 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/be/src/olap/parallel_scanner_builder.cpp 
b/be/src/olap/parallel_scanner_builder.cpp
index a7f7d6da001..5ad74232215 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -197,9 +197,8 @@ template <typename ParentType>
 std::shared_ptr<NewOlapScanner> 
ParallelScannerBuilder<ParentType>::_build_scanner(
         BaseTabletSPtr tablet, int64_t version, const 
std::vector<OlapScanRange*>& key_ranges,
         TabletReader::ReadSource&& read_source) {
-    NewOlapScanner::Params params {
-            _state,  _scanner_profile.get(), key_ranges,         
std::move(tablet),
-            version, std::move(read_source), _limit_per_scanner, 
_is_preaggregation};
+    NewOlapScanner::Params params {_state,  _scanner_profile.get(), 
key_ranges, std::move(tablet),
+                                   version, std::move(read_source), _limit,    
 _is_preaggregation};
     return NewOlapScanner::create_shared(_parent, std::move(params));
 }
 
diff --git a/be/src/olap/parallel_scanner_builder.h 
b/be/src/olap/parallel_scanner_builder.h
index b9d659abc27..7d28dd706f5 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -46,11 +46,11 @@ public:
     ParallelScannerBuilder(ParentType* parent, const 
std::vector<TabletWithVersion>& tablets,
                            const std::shared_ptr<RuntimeProfile>& profile,
                            const std::vector<OlapScanRange*>& key_ranges, 
RuntimeState* state,
-                           int64_t limit_per_scanner, bool is_dup_mow_key, 
bool is_preaggregation)
+                           int64_t limit, bool is_dup_mow_key, bool 
is_preaggregation)
             : _parent(parent),
               _scanner_profile(profile),
               _state(state),
-              _limit_per_scanner(limit_per_scanner),
+              _limit(limit),
               _is_dup_mow_key(is_dup_mow_key),
               _is_preaggregation(is_preaggregation),
               _tablets(tablets.cbegin(), tablets.cend()),
@@ -87,7 +87,7 @@ private:
 
     std::shared_ptr<RuntimeProfile> _scanner_profile;
     RuntimeState* _state;
-    int64_t _limit_per_scanner;
+    int64_t _limit;
     bool _is_dup_mow_key;
     bool _is_preaggregation;
     std::vector<TabletWithVersion> _tablets;
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp 
b/be/src/pipeline/exec/es_scan_operator.cpp
index c00ee6917ea..0e5018b85d6 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -95,8 +95,8 @@ Status 
EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca
                 properties, p._column_names, p._docvalue_context, 
&doc_value_mode);
 
         std::shared_ptr<vectorized::NewEsScanner> scanner = 
vectorized::NewEsScanner::create_shared(
-                vectorized::RuntimeFilterConsumer::_state, this, 
p._limit_per_scanner, p._tuple_id,
-                properties, p._docvalue_context, doc_value_mode,
+                vectorized::RuntimeFilterConsumer::_state, this, p._limit, 
p._tuple_id, properties,
+                p._docvalue_context, doc_value_mode,
                 vectorized::RuntimeFilterConsumer::_state->runtime_profile());
 
         RETURN_IF_ERROR(
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 59dbbe8d1a5..9b2eeb9b28b 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -45,8 +45,7 @@ Status 
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     _kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
     for (int i = 0; i < _max_scanners; ++i) {
         std::unique_ptr<vectorized::VFileScanner> scanner = 
vectorized::VFileScanner::create_unique(
-                state(), this, p._limit_per_scanner, _split_source, 
_scanner_profile.get(),
-                _kv_cache.get());
+                state(), this, p._limit, _split_source, 
_scanner_profile.get(), _kv_cache.get());
         RETURN_IF_ERROR(
                 scanner->prepare(_conjuncts, &_colname_to_value_range, 
&_colname_to_slot_id));
         scanners->push_back(std::move(scanner));
diff --git a/be/src/pipeline/exec/jdbc_scan_operator.cpp 
b/be/src/pipeline/exec/jdbc_scan_operator.cpp
index f6c22db9283..35ad7ec0490 100644
--- a/be/src/pipeline/exec/jdbc_scan_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_scan_operator.cpp
@@ -30,7 +30,7 @@ std::string JDBCScanLocalState::name_suffix() const {
 Status JDBCScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     auto& p = _parent->cast<JDBCScanOperatorX>();
     std::unique_ptr<vectorized::NewJdbcScanner> scanner = 
vectorized::NewJdbcScanner::create_unique(
-            state(), this, p._limit_per_scanner, p._tuple_id, p._query_string, 
p._table_type,
+            state(), this, p._limit, p._tuple_id, p._query_string, 
p._table_type,
             _scanner_profile.get());
     RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
     scanners->push_back(std::move(scanner));
diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp 
b/be/src/pipeline/exec/meta_scan_operator.cpp
index 749fbcf333a..e5edc001bea 100644
--- a/be/src/pipeline/exec/meta_scan_operator.cpp
+++ b/be/src/pipeline/exec/meta_scan_operator.cpp
@@ -30,8 +30,7 @@ Status 
MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
 
     for (auto& scan_range : _scan_ranges) {
         std::shared_ptr<vectorized::VMetaScanner> scanner = 
vectorized::VMetaScanner::create_shared(
-                state(), this, p._tuple_id, scan_range, p._limit_per_scanner, 
profile(),
-                p._user_identity);
+                state(), this, p._tuple_id, scan_range, p._limit, profile(), 
p._user_identity);
         RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
         scanners->push_back(scanner);
     }
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 00650b8a976..d9e22846377 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -287,7 +287,7 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
         }
 
         ParallelScannerBuilder<OlapScanLocalState> scanner_builder(
-                this, tablets, _scanner_profile, key_ranges, state(), 
p._limit_per_scanner, true,
+                this, tablets, _scanner_profile, key_ranges, state(), 
p._limit, true,
                 p._olap_scan_node.is_preaggregation);
 
         int max_scanners_count = state()->parallel_scan_max_scanners_count();
@@ -326,7 +326,7 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
                               std::move(tablet),
                               version,
                               {},
-                              p._limit_per_scanner,
+                              p._limit,
                               p._olap_scan_node.is_preaggregation,
                       });
         RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index c2de00830fc..632c92f7a81 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -80,7 +80,11 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
                                                       : 
state->query_parallel_instance_num());
     _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
     _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
-
+    // 1. Calculate max concurrency
+    // For select * from table limit 10; should just use one thread.
+    if (_local_state && _local_state->should_run_serial()) {
+        _max_thread_num = 1;
+    }
     // when user not specify scan_thread_num, so we can try downgrade 
_max_thread_num.
     // becaue we found in a table with 5k columns, column reader may ocuppy 
too much memory.
     // you can refer https://github.com/apache/doris/issues/35340 for details.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to