This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new c2fa60cbe52 [Enchancement](scan) enable parallel scan when preagg is on (#36302) c2fa60cbe52 is described below commit c2fa60cbe52b9ebdf26fbef1f6959519e6a86203 Author: Pxl <pxl...@qq.com> AuthorDate: Fri Jun 14 23:44:41 2024 +0800 [Enchancement](scan) enable parallel scan when preagg is on (#36302) ## Proposed changes pick from #35810 --- be/src/pipeline/exec/olap_scan_operator.cpp | 69 +++++++++++++---------------- 1 file changed, 30 insertions(+), 39 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index c8a9aa3f85d..00650b8a976 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -265,18 +265,11 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s state()->query_options().resource_limit.__isset.cpu_limit; if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit && - p._push_down_agg_type == TPushAggOp::NONE) { + p._push_down_agg_type == TPushAggOp::NONE && + (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) { std::vector<TabletWithVersion> tablets; - bool is_dup_mow_key = true; for (auto&& scan_range : _scan_ranges) { auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); - is_dup_mow_key = - tablet->keys_type() == DUP_KEYS || (tablet->keys_type() == UNIQUE_KEYS && - tablet->enable_unique_key_merge_on_write()); - if (!is_dup_mow_key) { - break; - } - int64_t version = 0; std::from_chars(scan_range->version.data(), scan_range->version.data() + scan_range->version.size(), version); @@ -284,42 +277,40 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s TabletWithVersion {std::dynamic_pointer_cast<Tablet>(tablet), version}); } - if (is_dup_mow_key) { - std::vector<OlapScanRange*> key_ranges; - for (auto& range : _cond_ranges) { - if (range->begin_scan_range.size() == 1 && - range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) { - continue; - } - key_ranges.emplace_back(range.get()); + std::vector<OlapScanRange*> key_ranges; + for (auto& range : _cond_ranges) { + if (range->begin_scan_range.size() == 1 && + range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) { + continue; } + key_ranges.emplace_back(range.get()); + } - ParallelScannerBuilder<OlapScanLocalState> scanner_builder( - this, tablets, _scanner_profile, key_ranges, state(), p._limit_per_scanner, - is_dup_mow_key, p._olap_scan_node.is_preaggregation); + ParallelScannerBuilder<OlapScanLocalState> scanner_builder( + this, tablets, _scanner_profile, key_ranges, state(), p._limit_per_scanner, true, + p._olap_scan_node.is_preaggregation); - int max_scanners_count = state()->parallel_scan_max_scanners_count(); + int max_scanners_count = state()->parallel_scan_max_scanners_count(); - // If the `max_scanners_count` was not set, - // use `config::doris_scanner_thread_pool_thread_num` as the default value. - if (max_scanners_count <= 0) { - max_scanners_count = config::doris_scanner_thread_pool_thread_num; - } + // If the `max_scanners_count` was not set, + // use `config::doris_scanner_thread_pool_thread_num` as the default value. + if (max_scanners_count <= 0) { + max_scanners_count = config::doris_scanner_thread_pool_thread_num; + } - // Too small value of `min_rows_per_scanner` is meaningless. - auto min_rows_per_scanner = - std::max<int64_t>(1024, state()->parallel_scan_min_rows_per_scanner()); - scanner_builder.set_max_scanners_count(max_scanners_count); - scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner); - - RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners)); - for (auto& scanner : *scanners) { - auto* olap_scanner = assert_cast<vectorized::NewOlapScanner*>(scanner.get()); - RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts)); - olap_scanner->set_compound_filters(_compound_filters); - } - return Status::OK(); + // Too small value of `min_rows_per_scanner` is meaningless. + auto min_rows_per_scanner = + std::max<int64_t>(1024, state()->parallel_scan_min_rows_per_scanner()); + scanner_builder.set_max_scanners_count(max_scanners_count); + scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner); + + RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners)); + for (auto& scanner : *scanners) { + auto* olap_scanner = assert_cast<vectorized::NewOlapScanner*>(scanner.get()); + RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts)); + olap_scanner->set_compound_filters(_compound_filters); } + return Status::OK(); } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org