This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new cd2d79c6e2d branch-3.0: [opt](remote scan) Fix remote scan parallelism 
(#43625)
cd2d79c6e2d is described below

commit cd2d79c6e2d592e4bb6f8fae3b701c137f9e2e39
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 11 22:53:57 2024 +0800

    branch-3.0: [opt](remote scan) Fix remote scan parallelism (#43625)
    
    Cherry-picked from #43532
    
    Co-authored-by: zhiqiang <seuhezhiqi...@163.com>
    Co-authored-by: zhiqiang-hhhh <hezhiqi...@flywheels.com>
---
 be/src/pipeline/exec/scan_operator.cpp   |  2 +-
 be/src/vec/exec/scan/scanner_context.cpp | 16 +++++++++++-----
 be/src/vec/exec/scan/scanner_context.h   |  3 ++-
 3 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 880a3d6e513..32943c4d44e 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -994,7 +994,7 @@ Status ScanLocalState<Derived>::_start_scanners(
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = vectorized::ScannerContext::create_shared(
             state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
-            _scan_dependency, p.is_serial_operator());
+            _scan_dependency, p.is_serial_operator(), 
p.is_file_scan_operator());
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index bea222bd0f3..d37d26b09f7 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -46,7 +46,8 @@ ScannerContext::ScannerContext(
         RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
         const TupleDescriptor* output_tuple_desc, const RowDescriptor* 
output_row_descriptor,
         const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& 
scanners, int64_t limit_,
-        std::shared_ptr<pipeline::Dependency> dependency, bool 
ignore_data_distribution)
+        std::shared_ptr<pipeline::Dependency> dependency, bool 
ignore_data_distribution,
+        bool is_file_scan_operator)
         : HasTaskExecutionCtx(state),
           _state(state),
           _local_state(local_state),
@@ -58,7 +59,8 @@ ScannerContext::ScannerContext(
           limit(limit_),
           _scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
           _all_scanners(scanners.begin(), scanners.end()),
-          _ignore_data_distribution(ignore_data_distribution) {
+          _ignore_data_distribution(ignore_data_distribution),
+          _is_file_scan_operator(is_file_scan_operator) {
     DCHECK(_output_row_descriptor == nullptr ||
            _output_row_descriptor->tuple_descriptors().size() == 1);
     _query_id = _state->get_query_ctx()->query_id();
@@ -143,7 +145,10 @@ Status ScannerContext::init() {
     }
 
     // _scannner_scheduler will be used to submit scan task.
-    if (_scanner_scheduler->get_queue_size() * 2 > 
config::doris_scanner_thread_pool_queue_size) {
+    // file_scan_operator currentlly has performance issue if we submit too 
many scan tasks to scheduler.
+    // we should fix this problem in the future.
+    if (_scanner_scheduler->get_queue_size() * 2 > 
config::doris_scanner_thread_pool_queue_size ||
+        _is_file_scan_operator) {
         submit_many_scan_tasks_for_potential_performance_issue = false;
     }
 
@@ -166,8 +171,9 @@ Status ScannerContext::init() {
         if (submit_many_scan_tasks_for_potential_performance_issue || 
_ignore_data_distribution) {
             _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1;
         } else {
-            _max_thread_num =
-                    4 * (config::doris_scanner_thread_pool_thread_num / 
num_parallel_instances);
+            const size_t factor = _is_file_scan_operator ? 1 : 4;
+            _max_thread_num = factor * 
(config::doris_scanner_thread_pool_thread_num /
+                                        num_parallel_instances);
             // In some rare cases, user may set num_parallel_instances to 1 
handly to make many query could be executed
             // in parallel. We need to make sure the _max_thread_num is 
smaller than previous value.
             _max_thread_num =
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 8a42bc037ca..7f7c550fad1 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -107,7 +107,7 @@ public:
                    const RowDescriptor* output_row_descriptor,
                    const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
                    int64_t limit_, std::shared_ptr<pipeline::Dependency> 
dependency,
-                   bool ignore_data_distribution);
+                   bool ignore_data_distribution, bool is_file_scan_operator);
 
     ~ScannerContext() override {
         
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
@@ -214,6 +214,7 @@ protected:
     QueryThreadContext _query_thread_context;
     std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
     bool _ignore_data_distribution = false;
+    bool _is_file_scan_operator = false;
 
     // for scaling up the running scanners
     size_t _estimated_block_size = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to