yiguolei commented on code in PR #28103:
URL: https://github.com/apache/doris/pull/28103#discussion_r1418537513


##########
be/src/olap/parallel_scanner_builder.cpp:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parallel_scanner_builder.h"
+
+#include "olap/rowset/beta_rowset.h"
+#include "vec/exec/scan/new_olap_scanner.h"
+
+namespace doris {
+

Review Comment:
   70 tablet --> 1 tablet , other 1000 rows
   48 scanner
   1 tablet ----> 48 scanner
   69 tablet---> 69 scanner 
   
   117 scanner ---> queue/ stack
   
   1 scanner thread
   1 scanner --> 1000 block     2000rows 
   1 scanner
   2 scanner 
   
   scanner thread pool 
    48
   
   10 scanner ---> queue/ stack
   



##########
be/src/vec/exec/scan/new_olap_scan_node.cpp:
##########
@@ -508,48 +503,87 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
     std::vector<std::pair<BaseTabletSPtr, int64_t /* version */>> 
tablets_to_scan;
     tablets_to_scan.reserve(_scan_ranges.size());
 
+    std::vector<TabletWithVersion> tablets;
+
     for (auto&& scan_range : _scan_ranges) {
         auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
         int64_t version = 0;
         std::from_chars(scan_range->version.data(),
                         scan_range->version.data() + 
scan_range->version.size(), version);
+        tablets.emplace_back(
+                TabletWithVersion {std::dynamic_pointer_cast<Tablet>(tablet), 
version});
         tablets_to_scan.emplace_back(std::move(tablet), version);
     }
 
+    bool is_dup_mow_key = true;
+    bool unique_keys_with_mor = false;
+    bool enable_parallel_scan = 
_state->query_options().__isset.enable_parallel_scan &&

Review Comment:
   add a function in runtime_state named enable_paralle_scan()  not use query 
optioins directly.



##########
be/src/olap/parallel_scanner_builder.h:
##########
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "olap/reader.h"
+#include "olap/rowset/segment_v2/row_ranges.h"
+#include "olap/segment_loader.h"
+#include "olap/tablet.h"
+#include "vec/exec/scan/new_olap_scanner.h"
+
+namespace doris {
+
+namespace vectorized {
+class VScanner;
+}
+
+using TabletSPtr = std::shared_ptr<Tablet>;
+using VScannerSPtr = std::shared_ptr<vectorized::VScanner>;
+
+struct TabletWithVersion {
+    TabletSPtr tablet;
+    int64_t version;
+};
+
+struct SegmentGroup {
+    RowsetSharedPtr rowset;
+    std::vector<Segment*> segments;
+    size_t num_blocks;
+    size_t num_rows;
+    std::string min_key;
+    std::string max_key;
+};
+
+template <typename ParentType>
+class ParallelScannerBuilder {
+public:
+    ParallelScannerBuilder(ParentType* parent, const 
std::vector<TabletWithVersion>& tablets,
+                           const std::shared_ptr<RuntimeProfile>& profile,
+                           const std::vector<OlapScanRange*>& key_ranges, 
RuntimeState* state,
+                           int64_t limit_per_scanner, bool is_dup_mow_key, 
bool is_preaggregation)
+            : _parent(parent),
+              _scanner_profile(profile),
+              _state(state),
+              _limit_per_scanner(limit_per_scanner),
+              _is_dup_mow_key(is_dup_mow_key),
+              _is_preaggregation(is_preaggregation),
+              _tablets(tablets.cbegin(), tablets.cend()),
+              _key_ranges(key_ranges.cbegin(), key_ranges.cend()) {}
+
+    Status build_scanners(std::list<VScannerSPtr>& scanners);
+
+    void set_max_scanners_count(size_t count) { _max_scanners_count = count; }
+
+private:
+    Status _load();
+
+    Status _build_scanners_by_rowid(std::list<VScannerSPtr>& scanners);
+
+    Status _build_scanners_by_key_range(std::list<VScannerSPtr>& scanners);
+
+    std::unique_ptr<SegmentGroup> _create_segment_group_from_rowsets(
+            const std::vector<RowsetSharedPtr>& rowsets);
+
+    Status _parse_segment_group(SegmentGroup& group);
+
+    Status _convert_key_ranges(const TabletSPtr& tablet, 
std::vector<std::string>& start_keys,
+                               std::vector<std::string>& end_keys,
+                               std::vector<bool>& include_begin_keys,
+                               std::vector<bool>& include_end_keys,
+                               std::shared_ptr<Schema>& schema);
+
+    std::shared_ptr<vectorized::NewOlapScanner> _build_scanner(
+            BaseTabletSPtr tablet, int64_t version, const 
std::vector<OlapScanRange*>& key_ranges,
+            TabletReader::ReadSource&& read_source);
+
+    ParentType* _parent;
+
+    /// Max scanners count limit to build
+    size_t _max_scanners_count {16};
+
+    /// Min rows per scanner
+    size_t _min_rows_per_scanner {2 * 1024 * 1024};

Review Comment:
   set it as session variable



##########
be/src/olap/parallel_scanner_builder.h:
##########
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "olap/reader.h"
+#include "olap/rowset/segment_v2/row_ranges.h"
+#include "olap/segment_loader.h"
+#include "olap/tablet.h"
+#include "vec/exec/scan/new_olap_scanner.h"
+
+namespace doris {
+
+namespace vectorized {
+class VScanner;
+}
+
+using TabletSPtr = std::shared_ptr<Tablet>;
+using VScannerSPtr = std::shared_ptr<vectorized::VScanner>;
+
+struct TabletWithVersion {
+    TabletSPtr tablet;
+    int64_t version;
+};
+
+struct SegmentGroup {
+    RowsetSharedPtr rowset;
+    std::vector<Segment*> segments;
+    size_t num_blocks;
+    size_t num_rows;
+    std::string min_key;
+    std::string max_key;
+};
+
+template <typename ParentType>
+class ParallelScannerBuilder {
+public:
+    ParallelScannerBuilder(ParentType* parent, const 
std::vector<TabletWithVersion>& tablets,
+                           const std::shared_ptr<RuntimeProfile>& profile,
+                           const std::vector<OlapScanRange*>& key_ranges, 
RuntimeState* state,
+                           int64_t limit_per_scanner, bool is_dup_mow_key, 
bool is_preaggregation)
+            : _parent(parent),
+              _scanner_profile(profile),
+              _state(state),
+              _limit_per_scanner(limit_per_scanner),
+              _is_dup_mow_key(is_dup_mow_key),
+              _is_preaggregation(is_preaggregation),
+              _tablets(tablets.cbegin(), tablets.cend()),
+              _key_ranges(key_ranges.cbegin(), key_ranges.cend()) {}
+
+    Status build_scanners(std::list<VScannerSPtr>& scanners);
+
+    void set_max_scanners_count(size_t count) { _max_scanners_count = count; }
+
+private:
+    Status _load();
+
+    Status _build_scanners_by_rowid(std::list<VScannerSPtr>& scanners);
+
+    Status _build_scanners_by_key_range(std::list<VScannerSPtr>& scanners);
+
+    std::unique_ptr<SegmentGroup> _create_segment_group_from_rowsets(
+            const std::vector<RowsetSharedPtr>& rowsets);
+
+    Status _parse_segment_group(SegmentGroup& group);
+
+    Status _convert_key_ranges(const TabletSPtr& tablet, 
std::vector<std::string>& start_keys,
+                               std::vector<std::string>& end_keys,
+                               std::vector<bool>& include_begin_keys,
+                               std::vector<bool>& include_end_keys,
+                               std::shared_ptr<Schema>& schema);
+
+    std::shared_ptr<vectorized::NewOlapScanner> _build_scanner(
+            BaseTabletSPtr tablet, int64_t version, const 
std::vector<OlapScanRange*>& key_ranges,
+            TabletReader::ReadSource&& read_source);
+
+    ParentType* _parent;
+
+    /// Max scanners count limit to build
+    size_t _max_scanners_count {16};
+
+    /// Min rows per scanner
+    size_t _min_rows_per_scanner {2 * 1024 * 1024};
+
+    size_t _total_rows {};
+
+    size_t _rows_per_scanners {_min_rows_per_scanner};

Review Comment:
   _rows_per_scanner



##########
be/src/vec/exec/scan/new_olap_scan_node.cpp:
##########
@@ -508,48 +503,87 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
     std::vector<std::pair<BaseTabletSPtr, int64_t /* version */>> 
tablets_to_scan;
     tablets_to_scan.reserve(_scan_ranges.size());
 
+    std::vector<TabletWithVersion> tablets;
+
     for (auto&& scan_range : _scan_ranges) {
         auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
         int64_t version = 0;
         std::from_chars(scan_range->version.data(),
                         scan_range->version.data() + 
scan_range->version.size(), version);
+        tablets.emplace_back(
+                TabletWithVersion {std::dynamic_pointer_cast<Tablet>(tablet), 
version});
         tablets_to_scan.emplace_back(std::move(tablet), version);
     }
 
+    bool is_dup_mow_key = true;
+    bool unique_keys_with_mor = false;
+    bool enable_parallel_scan = 
_state->query_options().__isset.enable_parallel_scan &&
+                                _state->query_options().enable_parallel_scan;
+
     // Split tablet segment by scanner, only use in pipeline in duplicate key
     // 1. if tablet count lower than scanner thread num, count segment num of 
all tablet ready for scan
     // TODO: some tablet may do not have segment, may need split segment all 
case
     if (_shared_scan_opt && _scan_ranges.size() < 
config::doris_scanner_thread_pool_thread_num) {
         for (auto&& [tablet, version] : tablets_to_scan) {
-            is_dup_mow_key =
+            unique_keys_with_mor |= tablet->keys_type() == UNIQUE_KEYS &&

Review Comment:
   why this is related with shared scan??



##########
be/src/olap/parallel_scanner_builder.cpp:
##########
@@ -0,0 +1,455 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parallel_scanner_builder.h"
+
+#include "exec/olap_utils.h"
+#include "olap/iterators.h"
+#include "olap/primary_key_index.h"
+#include "olap/row_cursor.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/segment_v2/segment.h"
+#include "olap/short_key_index.h"
+#include "util/key_util.h"
+#include "vec/exec/scan/new_olap_scanner.h"
+
+namespace doris {
+
+using namespace vectorized;
+
+using SplitKey = StorageReadOptions::SplitKey;
+using SplitKeyRange = StorageReadOptions::SplitKeyRange;
+
+template <typename ParentType>
+Status 
ParallelScannerBuilder<ParentType>::build_scanners(std::list<VScannerSPtr>& 
scanners) {
+    RETURN_IF_ERROR(_load());
+    if (_is_dup_mow_key) {
+        return _build_scanners_by_rowid(scanners);
+    } else {
+        return _build_scanners_by_key_range(scanners);
+    }
+}
+
+template <typename ParentType>
+Status ParallelScannerBuilder<ParentType>::_build_scanners_by_rowid(
+        std::list<VScannerSPtr>& scanners) {
+    DCHECK_GE(_rows_per_scanners, _min_rows_per_scanner);
+    for (auto&& [tablet, version] : _tablets) {
+        DCHECK(_all_rowsets.contains(tablet->tablet_id()));
+        auto& rowsets = _all_rowsets[tablet->tablet_id()];
+
+        TabletReader::ReadSource reade_source_with_delete_info;
+        if (!_state->skip_delete_predicate()) {
+            RETURN_IF_ERROR(tablet->capture_rs_readers(
+                    {0, version}, &reade_source_with_delete_info.rs_splits, 
false));
+            reade_source_with_delete_info.fill_delete_predicates();
+        }
+
+        TabletReader::ReadSource read_source;
+
+        int64_t rows_collected = 0;
+        for (auto& rowset : rowsets) {
+            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+            RowsetReaderSharedPtr reader;
+            RETURN_IF_ERROR(beta_rowset->create_reader(&reader));
+            const auto rowset_id = beta_rowset->rowset_id();
+
+            DCHECK(_segment_cache_handles.contains(rowset_id));
+            auto& segment_cache_handle = _segment_cache_handles[rowset_id];
+
+            if (beta_rowset->num_rows() == 0) {
+                continue;
+            }
+
+            const auto& segments = segment_cache_handle.get_segments();
+            int segment_start = 0;
+            auto split = RowSetSplits(reader->clone());
+
+            for (size_t i = 0; i != segments.size(); ++i) {
+                const auto& segment = segments[i];
+                RowRanges row_ranges;
+                const size_t rows_of_segment = segment->num_rows();
+                int64_t offset_in_segment = 0;
+
+                while (offset_in_segment < rows_of_segment) {
+                    const int64_t remaining_rows = rows_of_segment - 
offset_in_segment;
+                    auto rows_need = _rows_per_scanners - rows_collected;
+                    if (rows_need >= remaining_rows * 0.9) {
+                        rows_need = remaining_rows;
+                    }
+                    DCHECK_LE(rows_need, remaining_rows);
+
+                    auto old_count = row_ranges.count();
+                    row_ranges.add({offset_in_segment,
+                                    offset_in_segment + 
static_cast<int64_t>(rows_need)});
+                    DCHECK_EQ(rows_need + old_count, row_ranges.count());
+                    rows_collected += rows_need;
+                    offset_in_segment += rows_need;
+
+                    if (rows_collected >= _rows_per_scanners) { // build a new 
scanner
+                        split.segment_offsets.first = segment_start,
+                        split.segment_offsets.second = i + 1;
+                        
split.segment_row_ranges.emplace_back(std::move(row_ranges));
+
+                        DCHECK_EQ(split.segment_offsets.second - 
split.segment_offsets.first,
+                                  split.segment_row_ranges.size());
+
+                        read_source.rs_splits.emplace_back(std::move(split));
+
+                        if (!_state->skip_delete_predicate()) {
+                            read_source.fill_delete_predicates();
+                        }
+
+                        scanners.emplace_back(
+                                _build_scanner(tablet, version, _key_ranges,
+                                               
{std::move(read_source.rs_splits),
+                                                
reade_source_with_delete_info.delete_predicates}));
+
+                        read_source = TabletReader::ReadSource();
+                        split = RowSetSplits(reader->clone());
+                        row_ranges = RowRanges();
+
+                        segment_start = offset_in_segment < rows_of_segment ? 
i : i + 1;
+                        rows_collected = 0;
+                    }
+                }
+
+                if (!row_ranges.is_empty()) {
+                    DCHECK_GT(rows_collected, 0);
+                    DCHECK_EQ(row_ranges.to(), segment->num_rows());
+                    
split.segment_row_ranges.emplace_back(std::move(row_ranges));
+                }
+            }
+
+            DCHECK_LE(rows_collected, _rows_per_scanners);
+            if (rows_collected > 0) {
+                split.segment_offsets.first = segment_start;
+                split.segment_offsets.second = segments.size();
+                DCHECK_GT(split.segment_offsets.second, 
split.segment_offsets.first);
+                DCHECK_EQ(split.segment_row_ranges.size(),
+                          split.segment_offsets.second - 
split.segment_offsets.first);
+                read_source.rs_splits.emplace_back(std::move(split));
+            }
+        } // end `for (auto& rowset : rowsets)`
+
+        DCHECK_LE(rows_collected, _rows_per_scanners);
+        if (rows_collected > 0) {
+            DCHECK_GT(read_source.rs_splits.size(), 0);
+#ifndef NDEBUG
+            for (auto& split : read_source.rs_splits) {
+                DCHECK(split.rs_reader != nullptr);
+                DCHECK_LT(split.segment_offsets.first, 
split.segment_offsets.second);
+                DCHECK_EQ(split.segment_row_ranges.size(),
+                          split.segment_offsets.second - 
split.segment_offsets.first);
+            }
+#endif
+            scanners.emplace_back(
+                    _build_scanner(tablet, version, _key_ranges,
+                                   {std::move(read_source.rs_splits),
+                                    
reade_source_with_delete_info.delete_predicates}));
+        }
+    }
+
+    return Status::OK();
+}
+template <typename ParentType>
+Status ParallelScannerBuilder<ParentType>::_load() {
+    for (auto&& [tablet, version] : _tablets) {
+        const auto tablet_id = tablet->tablet_id();
+        auto& rows = _tablets_rows.emplace_back(0);

Review Comment:
   these logic is very difficult to understand.



##########
be/src/vec/exec/scan/new_olap_scan_node.cpp:
##########
@@ -508,48 +503,87 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
     std::vector<std::pair<BaseTabletSPtr, int64_t /* version */>> 
tablets_to_scan;
     tablets_to_scan.reserve(_scan_ranges.size());
 
+    std::vector<TabletWithVersion> tablets;
+
     for (auto&& scan_range : _scan_ranges) {
         auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
         int64_t version = 0;
         std::from_chars(scan_range->version.data(),
                         scan_range->version.data() + 
scan_range->version.size(), version);
+        tablets.emplace_back(
+                TabletWithVersion {std::dynamic_pointer_cast<Tablet>(tablet), 
version});
         tablets_to_scan.emplace_back(std::move(tablet), version);
     }
 
+    bool is_dup_mow_key = true;
+    bool unique_keys_with_mor = false;
+    bool enable_parallel_scan = 
_state->query_options().__isset.enable_parallel_scan &&
+                                _state->query_options().enable_parallel_scan;
+
     // Split tablet segment by scanner, only use in pipeline in duplicate key
     // 1. if tablet count lower than scanner thread num, count segment num of 
all tablet ready for scan
     // TODO: some tablet may do not have segment, may need split segment all 
case
     if (_shared_scan_opt && _scan_ranges.size() < 
config::doris_scanner_thread_pool_thread_num) {
         for (auto&& [tablet, version] : tablets_to_scan) {
-            is_dup_mow_key =
+            unique_keys_with_mor |= tablet->keys_type() == UNIQUE_KEYS &&
+                                    
!tablet->enable_unique_key_merge_on_write();
+            is_dup_mow_key &=
                     tablet->keys_type() == DUP_KEYS || (tablet->keys_type() == 
UNIQUE_KEYS &&
                                                         
tablet->enable_unique_key_merge_on_write());
-            if (!is_dup_mow_key) {
-                break;
-            }
+            if (is_dup_mow_key) {
+                auto& read_source = tablets_read_source.emplace_back();
+                {
+                    std::shared_lock rdlock(tablet->get_header_lock());
+                    auto st =
+                            tablet->capture_rs_readers({0, version}, 
&read_source.rs_splits, false);
+                    if (!st.ok()) {
+                        LOG(WARNING) << "fail to init reader.res=" << st;
+                        return Status::InternalError(
+                                "failed to initialize storage reader. 
tablet_id={} : {}",
+                                tablet->tablet_id(), st.to_string());
+                    }
+                }
+                if (!_state->skip_delete_predicate()) {
+                    read_source.fill_delete_predicates();
+                }
 
-            auto& read_source = tablets_read_source.emplace_back();
-            {
-                std::shared_lock rdlock(tablet->get_header_lock());
-                auto st = tablet->capture_rs_readers({0, version}, 
&read_source.rs_splits, false);
-                if (!st.ok()) {
-                    LOG(WARNING) << "fail to init reader.res=" << st;
-                    return Status::InternalError(
-                            "failed to initialize storage reader. tablet_id={} 
: {}",
-                            tablet->tablet_id(), st.to_string());
+                auto& rs_seg_count = tablet_rs_seg_count.emplace_back();
+                for (const auto& rowset_splits : read_source.rs_splits) {
+                    auto num_segments = 
rowset_splits.rs_reader->rowset()->num_segments();
+                    rs_seg_count.emplace_back(num_segments);
+                    segment_count += num_segments;
                 }
             }
-            if (!_state->skip_delete_predicate()) {
-                read_source.fill_delete_predicates();
-            }
+        }
+    } else {
+        enable_parallel_scan = false;
+        is_dup_mow_key = false;
+    }
 
-            auto& rs_seg_count = tablet_rs_seg_count.emplace_back();
-            for (const auto& rowset_splits : read_source.rs_splits) {
-                auto num_segments = 
rowset_splits.rs_reader->rowset()->num_segments();
-                rs_seg_count.emplace_back(num_segments);
-                segment_count += num_segments;
+    if (!unique_keys_with_mor && enable_parallel_scan && _push_down_agg_type 
== TPushAggOp::NONE) {
+        std::vector<OlapScanRange*> key_ranges;
+        for (auto& range : _cond_ranges) {
+            if (range->begin_scan_range.size() == 1 &&
+                range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) {
+                continue;
             }
+            key_ranges.emplace_back(range.get());
+        }
+
+        ParallelScannerBuilder<NewOlapScanNode> scanner_builder(
+                this, tablets, _scanner_profile, key_ranges, _state, 
_limit_per_scanner,
+                is_dup_mow_key, _olap_scan_node.is_preaggregation);
+
+        
scanner_builder.set_max_scanners_count(config::doris_scanner_thread_pool_thread_num);

Review Comment:
   use a session variable to control the max scanner num



##########
be/src/olap/parallel_scanner_builder.cpp:
##########
@@ -0,0 +1,455 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parallel_scanner_builder.h"
+
+#include "exec/olap_utils.h"
+#include "olap/iterators.h"
+#include "olap/primary_key_index.h"
+#include "olap/row_cursor.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/segment_v2/segment.h"
+#include "olap/short_key_index.h"
+#include "util/key_util.h"
+#include "vec/exec/scan/new_olap_scanner.h"
+
+namespace doris {
+
+using namespace vectorized;
+
+using SplitKey = StorageReadOptions::SplitKey;
+using SplitKeyRange = StorageReadOptions::SplitKeyRange;
+
+template <typename ParentType>
+Status 
ParallelScannerBuilder<ParentType>::build_scanners(std::list<VScannerSPtr>& 
scanners) {
+    RETURN_IF_ERROR(_load());
+    if (_is_dup_mow_key) {
+        return _build_scanners_by_rowid(scanners);
+    } else {
+        return _build_scanners_by_key_range(scanners);
+    }
+}
+
+template <typename ParentType>
+Status ParallelScannerBuilder<ParentType>::_build_scanners_by_rowid(
+        std::list<VScannerSPtr>& scanners) {
+    DCHECK_GE(_rows_per_scanners, _min_rows_per_scanner);
+    for (auto&& [tablet, version] : _tablets) {
+        DCHECK(_all_rowsets.contains(tablet->tablet_id()));
+        auto& rowsets = _all_rowsets[tablet->tablet_id()];
+
+        TabletReader::ReadSource reade_source_with_delete_info;
+        if (!_state->skip_delete_predicate()) {
+            RETURN_IF_ERROR(tablet->capture_rs_readers(
+                    {0, version}, &reade_source_with_delete_info.rs_splits, 
false));
+            reade_source_with_delete_info.fill_delete_predicates();
+        }
+
+        TabletReader::ReadSource read_source;
+
+        int64_t rows_collected = 0;
+        for (auto& rowset : rowsets) {
+            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+            RowsetReaderSharedPtr reader;
+            RETURN_IF_ERROR(beta_rowset->create_reader(&reader));
+            const auto rowset_id = beta_rowset->rowset_id();
+
+            DCHECK(_segment_cache_handles.contains(rowset_id));
+            auto& segment_cache_handle = _segment_cache_handles[rowset_id];
+
+            if (beta_rowset->num_rows() == 0) {
+                continue;
+            }
+
+            const auto& segments = segment_cache_handle.get_segments();
+            int segment_start = 0;
+            auto split = RowSetSplits(reader->clone());
+
+            for (size_t i = 0; i != segments.size(); ++i) {
+                const auto& segment = segments[i];
+                RowRanges row_ranges;
+                const size_t rows_of_segment = segment->num_rows();
+                int64_t offset_in_segment = 0;
+
+                while (offset_in_segment < rows_of_segment) {
+                    const int64_t remaining_rows = rows_of_segment - 
offset_in_segment;
+                    auto rows_need = _rows_per_scanners - rows_collected;
+                    if (rows_need >= remaining_rows * 0.9) {
+                        rows_need = remaining_rows;
+                    }
+                    DCHECK_LE(rows_need, remaining_rows);
+
+                    auto old_count = row_ranges.count();
+                    row_ranges.add({offset_in_segment,

Review Comment:
   add comment to describe the row range structure
   for example, it is  [offset, length] ? or [start_offset, end_offset) or 
[start_offset, end_offset] ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to