This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d7c99f8d3fb [Opt] (multi-catalog) opt max scanner thread number in 
batch split mode. (#44635)
d7c99f8d3fb is described below

commit d7c99f8d3fbaa94447b8cb064319db3cff95ed73
Author: Qi Chen <che...@selectdb.com>
AuthorDate: Wed Jan 22 22:59:58 2025 +0800

    [Opt] (multi-catalog) opt max scanner thread number in batch split mode. 
(#44635)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    There's only one scan range for each backend in batch split mode. Each
    backend only starts up one ScanNode instance. However, when calculating
    the concurrency of scanners in the scan operator in batch split mode, it
    is not divided by 1, but by `query_parallel_instance_num`, resulting in
    poor performance of batch split mode.
---
 be/src/pipeline/exec/file_scan_operator.cpp | 45 ++++++++++++++++++++---------
 be/src/pipeline/exec/file_scan_operator.h   |  6 ++++
 be/src/pipeline/exec/scan_operator.cpp      |  5 +++-
 be/src/pipeline/exec/scan_operator.h        |  6 ++++
 be/src/vec/exec/scan/scanner_context.cpp    |  9 +++---
 be/src/vec/exec/scan/scanner_context.h      |  4 ++-
 6 files changed, 54 insertions(+), 21 deletions(-)

diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 7afbb29134c..3cb934c1015 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -26,6 +26,7 @@
 #include "pipeline/exec/olap_scan_operator.h"
 #include "pipeline/exec/scan_operator.h"
 #include "vec/exec/format/format_common.h"
+#include "vec/exec/scan/scanner_context.h"
 #include "vec/exec/scan/vfile_scanner.h"
 
 namespace doris::pipeline {
@@ -37,9 +38,10 @@ Status 
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     }
 
     auto& p = _parent->cast<FileScanOperatorX>();
-    uint32_t shard_num = std::min(
-            config::doris_scanner_thread_pool_thread_num / 
state()->query_parallel_instance_num(),
-            _max_scanners);
+    // There's only one scan range for each backend in batch split mode. Each 
backend only starts up one ScanNode instance.
+    uint32_t shard_num =
+            std::min(config::doris_scanner_thread_pool_thread_num / 
p.query_parallel_instance_num(),
+                     _max_scanners);
     shard_num = std::max(shard_num, 1U);
     _kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
     for (int i = 0; i < _max_scanners; ++i) {
@@ -60,28 +62,43 @@ std::string FileScanLocalState::name_suffix() const {
 
 void FileScanLocalState::set_scan_ranges(RuntimeState* state,
                                          const std::vector<TScanRangeParams>& 
scan_ranges) {
-    _max_scanners =
-            config::doris_scanner_thread_pool_thread_num / 
state->query_parallel_instance_num();
-    _max_scanners = std::max(std::max(_max_scanners, 
state->parallel_scan_max_scanners_count()), 1);
-    // For select * from table limit 10; should just use one thread.
-    if (should_run_serial()) {
-        _max_scanners = 1;
-    }
+    auto& p = _parent->cast<FileScanOperatorX>();
+
+    auto calc_max_scanners = [&](int parallel_instance_num) -> int {
+        int max_scanners = config::doris_scanner_thread_pool_thread_num / 
parallel_instance_num;
+        max_scanners =
+                std::max(std::max(max_scanners, 
state->parallel_scan_max_scanners_count()), 1);
+        if (should_run_serial()) {
+            max_scanners = 1;
+        }
+        return max_scanners;
+    };
+
     if (scan_ranges.size() == 1) {
         auto scan_range = 
scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
         if (scan_range.__isset.split_source) {
+            p._batch_split_mode = true;
             auto split_source = scan_range.split_source;
             RuntimeProfile::Counter* get_split_timer = 
ADD_TIMER(_runtime_profile, "GetSplitTime");
+
+            _max_scanners = calc_max_scanners(p.query_parallel_instance_num());
             _split_source = 
std::make_shared<vectorized::RemoteSplitSourceConnector>(
                     state, get_split_timer, split_source.split_source_id, 
split_source.num_splits,
                     _max_scanners);
         }
     }
-    if (_split_source == nullptr) {
-        _split_source =
-                
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges, 
_max_scanners);
+
+    if (!p._batch_split_mode) {
+        _max_scanners = calc_max_scanners(p.query_parallel_instance_num());
+        if (_split_source == nullptr) {
+            _split_source = 
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
+                                                                               
     _max_scanners);
+        }
+        // currently the total number of splits in the bach split mode cannot 
be accurately obtained,
+        // so we don't do it in the batch split mode.
+        _max_scanners = std::min(_max_scanners, 
_split_source->num_scan_ranges());
     }
-    _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges());
+
     if (scan_ranges.size() > 0 &&
         
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
         // for compatibility.
diff --git a/be/src/pipeline/exec/file_scan_operator.h 
b/be/src/pipeline/exec/file_scan_operator.h
index 87c5bcd2e54..8b7b25a025e 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -81,10 +81,16 @@ public:
 
     bool is_file_scan_operator() const override { return true; }
 
+    // There's only one scan range for each backend in batch split mode. Each 
backend only starts up one ScanNode instance.
+    int query_parallel_instance_num() const override {
+        return _batch_split_mode ? 1 : _query_parallel_instance_num;
+    }
+
 private:
     friend class FileScanLocalState;
 
     const std::string _table_name;
+    bool _batch_split_mode = false;
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index c694822adf3..d437289d6b7 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -996,7 +996,8 @@ Status ScanLocalState<Derived>::_start_scanners(
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = vectorized::ScannerContext::create_shared(
             state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
-            _scan_dependency, p.is_serial_operator(), 
p.is_file_scan_operator());
+            _scan_dependency, p.is_serial_operator(), 
p.is_file_scan_operator(),
+            p.query_parallel_instance_num());
     return Status::OK();
 }
 
@@ -1205,6 +1206,8 @@ Status ScanOperatorX<LocalStateType>::init(const 
TPlanNode& tnode, RuntimeState*
         }
     }
 
+    _query_parallel_instance_num = state->query_parallel_instance_num();
+
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index c6c9cdf405d..f9a0b5dc428 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -372,6 +372,10 @@ public:
 
     [[nodiscard]] virtual bool is_file_scan_operator() const { return false; }
 
+    [[nodiscard]] virtual int query_parallel_instance_num() const {
+        return _query_parallel_instance_num;
+    }
+
     const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
         return _runtime_filter_descs;
     }
@@ -434,6 +438,8 @@ protected:
     int64_t _push_down_count = -1;
     const int _parallel_tasks = 0;
 
+    int _query_parallel_instance_num = 0;
+
     std::vector<int> topn_filter_source_node_ids;
 };
 
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 688204bdff8..a683e31acda 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -47,7 +47,7 @@ ScannerContext::ScannerContext(
         const TupleDescriptor* output_tuple_desc, const RowDescriptor* 
output_row_descriptor,
         const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& 
scanners, int64_t limit_,
         std::shared_ptr<pipeline::Dependency> dependency, bool 
ignore_data_distribution,
-        bool is_file_scan_operator)
+        bool is_file_scan_operator, int num_parallel_instances)
         : HasTaskExecutionCtx(state),
           _state(state),
           _local_state(local_state),
@@ -60,7 +60,8 @@ ScannerContext::ScannerContext(
           _scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
           _all_scanners(scanners.begin(), scanners.end()),
           _ignore_data_distribution(ignore_data_distribution),
-          _is_file_scan_operator(is_file_scan_operator) {
+          _is_file_scan_operator(is_file_scan_operator),
+          _num_parallel_instances(num_parallel_instances) {
     DCHECK(_output_row_descriptor == nullptr ||
            _output_row_descriptor->tuple_descriptors().size() == 1);
     _query_id = _state->get_query_ctx()->query_id();
@@ -105,8 +106,6 @@ Status ScannerContext::init() {
     _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
                                                     thread_token == nullptr ? 
"False" : "True");
 
-    const int num_parallel_instances = _state->query_parallel_instance_num();
-
     // _max_bytes_in_queue controls the maximum memory that can be used by a 
single scan instance.
     // scan_queue_mem_limit on FE is 100MB by default, on backend we will make 
sure its actual value
     // is larger than 10MB.
@@ -176,7 +175,7 @@ Status ScannerContext::init() {
         } else {
             const size_t factor = _is_file_scan_operator ? 1 : 4;
             _max_thread_num = factor * 
(config::doris_scanner_thread_pool_thread_num /
-                                        num_parallel_instances);
+                                        _num_parallel_instances);
             // In some rare cases, user may set num_parallel_instances to 1 
handly to make many query could be executed
             // in parallel. We need to make sure the _max_thread_num is 
smaller than previous value.
             _max_thread_num =
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index d1cf06d5668..b385855d0f2 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -107,7 +107,8 @@ public:
                    const RowDescriptor* output_row_descriptor,
                    const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
                    int64_t limit_, std::shared_ptr<pipeline::Dependency> 
dependency,
-                   bool ignore_data_distribution, bool is_file_scan_operator);
+                   bool ignore_data_distribution, bool is_file_scan_operator,
+                   int num_parallel_instances);
 
     ~ScannerContext() override {
         
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
@@ -213,6 +214,7 @@ protected:
     std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
     bool _ignore_data_distribution = false;
     bool _is_file_scan_operator = false;
+    int _num_parallel_instances;
 
     // for scaling up the running scanners
     size_t _estimated_block_size = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to