This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new cd2d79c6e2d branch-3.0: [opt](remote scan) Fix remote scan parallelism (#43625) cd2d79c6e2d is described below commit cd2d79c6e2d592e4bb6f8fae3b701c137f9e2e39 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Mon Nov 11 22:53:57 2024 +0800 branch-3.0: [opt](remote scan) Fix remote scan parallelism (#43625) Cherry-picked from #43532 Co-authored-by: zhiqiang <seuhezhiqi...@163.com> Co-authored-by: zhiqiang-hhhh <hezhiqi...@flywheels.com> --- be/src/pipeline/exec/scan_operator.cpp | 2 +- be/src/vec/exec/scan/scanner_context.cpp | 16 +++++++++++----- be/src/vec/exec/scan/scanner_context.h | 3 ++- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 880a3d6e513..32943c4d44e 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -994,7 +994,7 @@ Status ScanLocalState<Derived>::_start_scanners( auto& p = _parent->cast<typename Derived::Parent>(); _scanner_ctx = vectorized::ScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - _scan_dependency, p.is_serial_operator()); + _scan_dependency, p.is_serial_operator(), p.is_file_scan_operator()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index bea222bd0f3..d37d26b09f7 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -46,7 +46,8 @@ ScannerContext::ScannerContext( RuntimeState* state, pipeline::ScanLocalStateBase* local_state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_, - std::shared_ptr<pipeline::Dependency> dependency, bool ignore_data_distribution) + std::shared_ptr<pipeline::Dependency> dependency, bool ignore_data_distribution, + bool is_file_scan_operator) : HasTaskExecutionCtx(state), _state(state), _local_state(local_state), @@ -58,7 +59,8 @@ ScannerContext::ScannerContext( limit(limit_), _scanner_scheduler_global(state->exec_env()->scanner_scheduler()), _all_scanners(scanners.begin(), scanners.end()), - _ignore_data_distribution(ignore_data_distribution) { + _ignore_data_distribution(ignore_data_distribution), + _is_file_scan_operator(is_file_scan_operator) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); @@ -143,7 +145,10 @@ Status ScannerContext::init() { } // _scannner_scheduler will be used to submit scan task. - if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size) { + // file_scan_operator currentlly has performance issue if we submit too many scan tasks to scheduler. + // we should fix this problem in the future. + if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size || + _is_file_scan_operator) { submit_many_scan_tasks_for_potential_performance_issue = false; } @@ -166,8 +171,9 @@ Status ScannerContext::init() { if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) { _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1; } else { - _max_thread_num = - 4 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances); + const size_t factor = _is_file_scan_operator ? 1 : 4; + _max_thread_num = factor * (config::doris_scanner_thread_pool_thread_num / + num_parallel_instances); // In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed // in parallel. We need to make sure the _max_thread_num is smaller than previous value. _max_thread_num = diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 8a42bc037ca..7f7c550fad1 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -107,7 +107,7 @@ public: const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency, - bool ignore_data_distribution); + bool ignore_data_distribution, bool is_file_scan_operator); ~ScannerContext() override { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); @@ -214,6 +214,7 @@ protected: QueryThreadContext _query_thread_context; std::shared_ptr<pipeline::Dependency> _dependency = nullptr; bool _ignore_data_distribution = false; + bool _is_file_scan_operator = false; // for scaling up the running scanners size_t _estimated_block_size = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org