This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new c2fa60cbe52 [Enchancement](scan) enable parallel scan when preagg is 
on (#36302)
c2fa60cbe52 is described below

commit c2fa60cbe52b9ebdf26fbef1f6959519e6a86203
Author: Pxl <pxl...@qq.com>
AuthorDate: Fri Jun 14 23:44:41 2024 +0800

    [Enchancement](scan) enable parallel scan when preagg is on (#36302)
    
    ## Proposed changes
    pick from #35810
---
 be/src/pipeline/exec/olap_scan_operator.cpp | 69 +++++++++++++----------------
 1 file changed, 30 insertions(+), 39 deletions(-)

diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index c8a9aa3f85d..00650b8a976 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -265,18 +265,11 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
                          
state()->query_options().resource_limit.__isset.cpu_limit;
 
     if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
-        p._push_down_agg_type == TPushAggOp::NONE) {
+        p._push_down_agg_type == TPushAggOp::NONE &&
+        (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) {
         std::vector<TabletWithVersion> tablets;
-        bool is_dup_mow_key = true;
         for (auto&& scan_range : _scan_ranges) {
             auto tablet = 
DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
-            is_dup_mow_key =
-                    tablet->keys_type() == DUP_KEYS || (tablet->keys_type() == 
UNIQUE_KEYS &&
-                                                        
tablet->enable_unique_key_merge_on_write());
-            if (!is_dup_mow_key) {
-                break;
-            }
-
             int64_t version = 0;
             std::from_chars(scan_range->version.data(),
                             scan_range->version.data() + 
scan_range->version.size(), version);
@@ -284,42 +277,40 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
                     TabletWithVersion 
{std::dynamic_pointer_cast<Tablet>(tablet), version});
         }
 
-        if (is_dup_mow_key) {
-            std::vector<OlapScanRange*> key_ranges;
-            for (auto& range : _cond_ranges) {
-                if (range->begin_scan_range.size() == 1 &&
-                    range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) 
{
-                    continue;
-                }
-                key_ranges.emplace_back(range.get());
+        std::vector<OlapScanRange*> key_ranges;
+        for (auto& range : _cond_ranges) {
+            if (range->begin_scan_range.size() == 1 &&
+                range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) {
+                continue;
             }
+            key_ranges.emplace_back(range.get());
+        }
 
-            ParallelScannerBuilder<OlapScanLocalState> scanner_builder(
-                    this, tablets, _scanner_profile, key_ranges, state(), 
p._limit_per_scanner,
-                    is_dup_mow_key, p._olap_scan_node.is_preaggregation);
+        ParallelScannerBuilder<OlapScanLocalState> scanner_builder(
+                this, tablets, _scanner_profile, key_ranges, state(), 
p._limit_per_scanner, true,
+                p._olap_scan_node.is_preaggregation);
 
-            int max_scanners_count = 
state()->parallel_scan_max_scanners_count();
+        int max_scanners_count = state()->parallel_scan_max_scanners_count();
 
-            // If the `max_scanners_count` was not set,
-            // use `config::doris_scanner_thread_pool_thread_num` as the 
default value.
-            if (max_scanners_count <= 0) {
-                max_scanners_count = 
config::doris_scanner_thread_pool_thread_num;
-            }
+        // If the `max_scanners_count` was not set,
+        // use `config::doris_scanner_thread_pool_thread_num` as the default 
value.
+        if (max_scanners_count <= 0) {
+            max_scanners_count = config::doris_scanner_thread_pool_thread_num;
+        }
 
-            // Too small value of `min_rows_per_scanner` is meaningless.
-            auto min_rows_per_scanner =
-                    std::max<int64_t>(1024, 
state()->parallel_scan_min_rows_per_scanner());
-            scanner_builder.set_max_scanners_count(max_scanners_count);
-            scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner);
-
-            RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners));
-            for (auto& scanner : *scanners) {
-                auto* olap_scanner = 
assert_cast<vectorized::NewOlapScanner*>(scanner.get());
-                RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts));
-                olap_scanner->set_compound_filters(_compound_filters);
-            }
-            return Status::OK();
+        // Too small value of `min_rows_per_scanner` is meaningless.
+        auto min_rows_per_scanner =
+                std::max<int64_t>(1024, 
state()->parallel_scan_min_rows_per_scanner());
+        scanner_builder.set_max_scanners_count(max_scanners_count);
+        scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner);
+
+        RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners));
+        for (auto& scanner : *scanners) {
+            auto* olap_scanner = 
assert_cast<vectorized::NewOlapScanner*>(scanner.get());
+            RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts));
+            olap_scanner->set_compound_filters(_compound_filters);
         }
+        return Status::OK();
     }
 
     int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to