This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 103c473b96 [Bug](pipeline) fix pipeline shared scan + topn 
optimization (#21940)
103c473b96 is described below

commit 103c473b967477b0748a9e7afa9f781f65d9a5eb
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Jul 25 12:48:27 2023 +0800

    [Bug](pipeline) fix pipeline shared scan + topn optimization (#21940)
---
 be/src/olap/merger.cpp                      |  26 +++--
 be/src/olap/reader.cpp                      |   4 +-
 be/src/olap/reader.h                        |  17 ++--
 be/src/olap/rowset/beta_rowset_reader.cpp   |  10 +-
 be/src/olap/rowset/beta_rowset_reader.h     |   6 +-
 be/src/olap/rowset/rowset_reader.h          |  19 +++-
 be/src/olap/schema_change.cpp               |  17 ++--
 be/src/olap/tablet.cpp                      |  10 +-
 be/src/olap/tablet.h                        |   4 +-
 be/src/vec/exec/scan/new_olap_scan_node.cpp | 148 +++++++++++++++-------------
 be/src/vec/exec/scan/new_olap_scanner.cpp   |  53 +++++-----
 be/src/vec/exec/scan/new_olap_scanner.h     |   6 +-
 be/src/vec/olap/block_reader.cpp            |  57 +++++++----
 be/src/vec/olap/block_reader.h              |   2 +-
 be/src/vec/olap/vcollect_iterator.cpp       |  25 ++---
 be/src/vec/olap/vcollect_iterator.h         |   9 +-
 be/src/vec/olap/vertical_block_reader.cpp   |  18 ++--
 be/test/olap/tablet_test.cpp                |   8 +-
 18 files changed, 240 insertions(+), 199 deletions(-)

diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 44fd3d510e..ee1bffa9e2 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -60,7 +60,10 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, 
ReaderType reader_type,
     TabletReader::ReaderParams reader_params;
     reader_params.tablet = tablet;
     reader_params.reader_type = reader_type;
-    reader_params.rs_readers = src_rowset_readers;
+    reader_params.rs_splits.reserve(src_rowset_readers.size());
+    for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
+        reader_params.rs_splits.emplace_back(RowSetSplits(rs_reader));
+    }
     reader_params.version = dst_rowset_writer->version();
 
     TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
@@ -92,10 +95,10 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, 
ReaderType reader_type,
         
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
         // init segment rowid map for rowid conversion
         std::vector<uint32_t> segment_num_rows;
-        for (auto& rs_reader : reader_params.rs_readers) {
-            
RETURN_IF_ERROR(rs_reader->get_segment_num_rows(&segment_num_rows));
-            
stats_output->rowid_conversion->init_segment_map(rs_reader->rowset()->rowset_id(),
-                                                             segment_num_rows);
+        for (auto& rs_split : reader_params.rs_splits) {
+            
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows));
+            stats_output->rowid_conversion->init_segment_map(
+                    rs_split.rs_reader->rowset()->rowset_id(), 
segment_num_rows);
         }
     }
 
@@ -194,7 +197,10 @@ Status Merger::vertical_compact_one_group(
     reader_params.is_key_column_group = is_key;
     reader_params.tablet = tablet;
     reader_params.reader_type = reader_type;
-    reader_params.rs_readers = src_rowset_readers;
+    reader_params.rs_splits.reserve(src_rowset_readers.size());
+    for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
+        reader_params.rs_splits.emplace_back(RowSetSplits(rs_reader));
+    }
     reader_params.version = dst_rowset_writer->version();
 
     TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
@@ -225,10 +231,10 @@ Status Merger::vertical_compact_one_group(
         
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
         // init segment rowid map for rowid conversion
         std::vector<uint32_t> segment_num_rows;
-        for (auto& rs_reader : reader_params.rs_readers) {
-            
RETURN_IF_ERROR(rs_reader->get_segment_num_rows(&segment_num_rows));
-            
stats_output->rowid_conversion->init_segment_map(rs_reader->rowset()->rowset_id(),
-                                                             segment_num_rows);
+        for (auto& rs_split : reader_params.rs_splits) {
+            
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows));
+            stats_output->rowid_conversion->init_segment_map(
+                    rs_split.rs_reader->rowset()->rowset_id(), 
segment_num_rows);
         }
     }
 
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 468c6289d5..1a5516f273 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -153,7 +153,7 @@ bool TabletReader::_optimize_for_single_rowset(
 }
 
 Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
-    if (read_params.rs_readers.empty()) {
+    if (read_params.rs_splits.empty()) {
         return Status::InternalError("fail to acquire data sources. tablet={}",
                                      _tablet->full_name());
     }
@@ -636,7 +636,7 @@ Status TabletReader::init_reader_params_and_create_block(
     for (auto& rowset : input_rowsets) {
         RowsetReaderSharedPtr rs_reader;
         RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
-        reader_params->rs_readers.push_back(std::move(rs_reader));
+        reader_params->rs_splits.push_back(RowSetSplits(std::move(rs_reader)));
     }
 
     std::vector<RowsetMetaSharedPtr> rowset_metas(input_rowsets.size());
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 231ff23d6e..249c199781 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -93,6 +93,16 @@ public:
     // Params for Reader,
     // mainly include tablet, data version and fetch range.
     struct ReaderParams {
+        bool has_single_version() const {
+            return (rs_splits.size() == 1 &&
+                    rs_splits[0].rs_reader->rowset()->start_version() == 0 &&
+                    
!rs_splits[0].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) ||
+                   (rs_splits.size() == 2 &&
+                    
rs_splits[0].rs_reader->rowset()->rowset_meta()->num_rows() == 0 &&
+                    rs_splits[1].rs_reader->rowset()->start_version() == 2 &&
+                    
!rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping());
+        }
+
         TabletSharedPtr tablet;
         TabletSchemaSPtr tablet_schema;
         ReaderType reader_type = ReaderType::READER_QUERY;
@@ -118,12 +128,7 @@ public:
 
         // For unique key table with merge-on-write
         DeleteBitmap* delete_bitmap {nullptr};
-
-        std::vector<RowsetReaderSharedPtr> rs_readers;
-        // if rs_readers_segment_offsets is not empty, means we only scan
-        // [pair.first, pair.second) segment in rs_reader, only effective in 
dup key
-        // and pipeline
-        std::vector<std::pair<int, int>> rs_readers_segment_offsets;
+        std::vector<RowSetSplits> rs_splits;
 
         // return_columns is init from query schema
         std::vector<uint32_t> return_columns;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 7ba0260801..bdb0e1fca6 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -76,8 +76,7 @@ bool BetaRowsetReader::update_profile(RuntimeProfile* 
profile) {
 
 Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* 
read_context,
                                                
std::vector<RowwiseIteratorUPtr>* out_iters,
-                                               const std::pair<int, int>& 
segment_offset,
-                                               bool use_cache) {
+                                               const RowSetSplits& rs_splits, 
bool use_cache) {
     RETURN_IF_ERROR(_rowset->load());
     _context = read_context;
     // The segment iterator is created with its own statistics,
@@ -218,7 +217,7 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
 
     // create iterator for each segment
     auto& segments = _segment_cache_handle.get_segments();
-    auto [seg_start, seg_end] = segment_offset;
+    auto [seg_start, seg_end] = rs_splits.segment_offsets;
     if (seg_start == seg_end) {
         seg_start = 0;
         seg_end = segments.size();
@@ -241,12 +240,11 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
     return Status::OK();
 }
 
-Status BetaRowsetReader::init(RowsetReaderContext* read_context,
-                              const std::pair<int, int>& segment_offset) {
+Status BetaRowsetReader::init(RowsetReaderContext* read_context, const 
RowSetSplits& rs_splits) {
     _context = read_context;
     _context->rowset_id = _rowset->rowset_id();
     std::vector<RowwiseIteratorUPtr> iterators;
-    RETURN_IF_ERROR(get_segment_iterators(_context, &iterators, 
segment_offset));
+    RETURN_IF_ERROR(get_segment_iterators(_context, &iterators, rs_splits));
 
     // merge or union segment iterator
     if (read_context->need_ordered_result && 
_rowset->rowset_meta()->is_segments_overlapping()) {
diff --git a/be/src/olap/rowset/beta_rowset_reader.h 
b/be/src/olap/rowset/beta_rowset_reader.h
index 83bc94ea0b..74ef9c96a6 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -46,13 +46,11 @@ public:
 
     ~BetaRowsetReader() override { _rowset->release(); }
 
-    Status init(RowsetReaderContext* read_context,
-                const std::pair<int, int>& segment_offset) override;
+    Status init(RowsetReaderContext* read_context, const RowSetSplits& 
rs_splits) override;
 
     Status get_segment_iterators(RowsetReaderContext* read_context,
                                  std::vector<RowwiseIteratorUPtr>* out_iters,
-                                 const std::pair<int, int>& segment_offset,
-                                 bool use_cache = false) override;
+                                 const RowSetSplits& rs_splits, bool use_cache 
= false) override;
     void reset_read_options() override;
     Status next_block(vectorized::Block* block) override;
     Status next_block_view(vectorized::BlockView* block_view) override;
diff --git a/be/src/olap/rowset/rowset_reader.h 
b/be/src/olap/rowset/rowset_reader.h
index dadfd0e05c..cbe005b9bd 100644
--- a/be/src/olap/rowset/rowset_reader.h
+++ b/be/src/olap/rowset/rowset_reader.h
@@ -37,17 +37,28 @@ class Block;
 class RowsetReader;
 using RowsetReaderSharedPtr = std::shared_ptr<RowsetReader>;
 
+struct RowSetSplits {
+    RowsetReaderSharedPtr rs_reader;
+
+    // if segment_offsets is not empty, means we only scan
+    // [pair.first, pair.second) segment in rs_reader, only effective in dup 
key
+    // and pipeline
+    std::pair<int, int> segment_offsets;
+
+    RowSetSplits(RowsetReaderSharedPtr rs_reader_)
+            : rs_reader(rs_reader_), segment_offsets({0, 0}) {}
+    RowSetSplits() = default;
+};
+
 class RowsetReader {
 public:
     virtual ~RowsetReader() = default;
 
-    // reader init
-    virtual Status init(RowsetReaderContext* read_context,
-                        const std::pair<int, int>& segment_offset = {0, 0}) = 
0;
+    virtual Status init(RowsetReaderContext* read_context, const RowSetSplits& 
rs_splits = {}) = 0;
 
     virtual Status get_segment_iterators(RowsetReaderContext* read_context,
                                          std::vector<RowwiseIteratorUPtr>* 
out_iters,
-                                         const std::pair<int, int>& 
segment_offset = {0, 0},
+                                         const RowSetSplits& rs_splits = {},
                                          bool use_cache = false) = 0;
     virtual void reset_read_options() = 0;
 
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index aff67f68bb..f2f69f241d 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -722,7 +722,7 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
     // reader_context is stack variables, it's lifetime should keep the same
     // with rs_readers
     RowsetReaderContext reader_context;
-    std::vector<RowsetReaderSharedPtr> rs_readers;
+    std::vector<RowSetSplits> rs_splits;
     // delete handlers for new tablet
     DeleteHandler delete_handler;
     std::vector<ColumnId> return_columns;
@@ -814,11 +814,11 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             }
 
             // acquire data sources correspond to history versions
-            base_tablet->capture_rs_readers(versions_to_be_changed, 
&rs_readers);
-            if (rs_readers.empty()) {
+            base_tablet->capture_rs_readers(versions_to_be_changed, 
&rs_splits);
+            if (rs_splits.empty()) {
                 res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>(
                         "fail to acquire all data sources. version_num={}, 
data_source_num={}",
-                        versions_to_be_changed.size(), rs_readers.size());
+                        versions_to_be_changed.size(), rs_splits.size());
                 break;
             }
             auto& all_del_preds = base_tablet->delete_predicates();
@@ -846,8 +846,8 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
             reader_context.delete_bitmap = 
&base_tablet->tablet_meta()->delete_bitmap();
             reader_context.version = Version(0, end_version);
-            for (auto& rs_reader : rs_readers) {
-                res = rs_reader->init(&reader_context);
+            for (auto& rs_split : rs_splits) {
+                res = rs_split.rs_reader->init(&reader_context);
                 if (!res) {
                     LOG(WARNING) << "failed to init rowset reader: " << 
base_tablet->full_name();
                     break;
@@ -865,7 +865,10 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
         DescriptorTbl::create(&sc_params.pool, request.desc_tbl, 
&sc_params.desc_tbl);
         sc_params.base_tablet = base_tablet;
         sc_params.new_tablet = new_tablet;
-        sc_params.ref_rowset_readers = rs_readers;
+        sc_params.ref_rowset_readers.reserve(rs_splits.size());
+        for (RowSetSplits& split : rs_splits) {
+            sc_params.ref_rowset_readers.emplace_back(split.rs_reader);
+        }
         sc_params.delete_handler = &delete_handler;
         sc_params.base_tablet_schema = base_tablet_schema;
         sc_params.be_exec_version = request.be_exec_version;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 3f2c1829a5..9348f78119 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -924,16 +924,16 @@ Status Tablet::_capture_consistent_rowsets_unlocked(const 
std::vector<Version>&
 }
 
 Status Tablet::capture_rs_readers(const Version& spec_version,
-                                  std::vector<RowsetReaderSharedPtr>* 
rs_readers) const {
+                                  std::vector<RowSetSplits>* rs_splits) const {
     std::vector<Version> version_path;
     RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path));
-    RETURN_IF_ERROR(capture_rs_readers(version_path, rs_readers));
+    RETURN_IF_ERROR(capture_rs_readers(version_path, rs_splits));
     return Status::OK();
 }
 
 Status Tablet::capture_rs_readers(const std::vector<Version>& version_path,
-                                  std::vector<RowsetReaderSharedPtr>* 
rs_readers) const {
-    DCHECK(rs_readers != nullptr && rs_readers->empty());
+                                  std::vector<RowSetSplits>* rs_splits) const {
+    DCHECK(rs_splits != nullptr && rs_splits->empty());
     for (auto version : version_path) {
         auto it = _rs_version_map.find(version);
         if (it == _rs_version_map.end()) {
@@ -954,7 +954,7 @@ Status Tablet::capture_rs_readers(const 
std::vector<Version>& version_path,
             return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
                     "failed to create reader for rowset:{}", 
it->second->rowset_id().to_string());
         }
-        rs_readers->push_back(std::move(rs_reader));
+        rs_splits->push_back(RowSetSplits(std::move(rs_reader)));
     }
     return Status::OK();
 }
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 1688dcf2a6..3efd2c89ce 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -184,10 +184,10 @@ public:
     Status capture_consistent_rowsets(const Version& spec_version,
                                       std::vector<RowsetSharedPtr>* rowsets) 
const;
     Status capture_rs_readers(const Version& spec_version,
-                              std::vector<RowsetReaderSharedPtr>* rs_readers) 
const;
+                              std::vector<RowSetSplits>* rs_splits) const;
 
     Status capture_rs_readers(const std::vector<Version>& version_path,
-                              std::vector<RowsetReaderSharedPtr>* rs_readers) 
const;
+                              std::vector<RowSetSplits>* rs_splits) const;
 
     const std::vector<RowsetMetaSharedPtr> delete_predicates() {
         return _tablet_meta->delete_predicates();
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index cfd9e329f0..f12533431f 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -454,9 +454,9 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
     int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
 
     bool is_duplicate_key = false;
-    int segment_count = 0;
-    std::vector<std::vector<RowsetReaderSharedPtr>> 
rowset_readers_vector(_scan_ranges.size());
-    std::vector<std::vector<int>> tablet_rs_seg_count(_scan_ranges.size());
+    size_t segment_count = 0;
+    std::vector<std::vector<RowSetSplits>> 
rowset_splits_vector(_scan_ranges.size());
+    std::vector<std::vector<size_t>> tablet_rs_seg_count(_scan_ranges.size());
 
     // Split tablet segment by scanner, only use in pipeline in duplicate key
     // 1. if tablet count lower than scanner thread num, count segment num of 
all tablet ready for scan
@@ -484,7 +484,7 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
             // to prevent this case: when there are lots of olap scanners to 
run for example 10000
             // the rowsets maybe compacted when the last olap scanner starts
             Status acquire_reader_st =
-                    tablet->capture_rs_readers({0, version}, 
&rowset_readers_vector[i]);
+                    tablet->capture_rs_readers({0, version}, 
&rowset_splits_vector[i]);
             if (!acquire_reader_st.ok()) {
                 LOG(WARNING) << "fail to init reader.res=" << 
acquire_reader_st;
                 std::stringstream ss;
@@ -494,31 +494,30 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
                 return Status::InternalError(ss.str());
             }
 
-            for (const auto& rowset_reader : rowset_readers_vector[i]) {
-                auto num_segments = rowset_reader->rowset()->num_segments();
+            for (const auto& rowset_splits : rowset_splits_vector[i]) {
+                auto num_segments = 
rowset_splits.rs_reader->rowset()->num_segments();
                 tablet_rs_seg_count[i].emplace_back(num_segments);
                 segment_count += num_segments;
             }
         }
     }
 
-    auto build_new_scanner = [&](const TPaloScanRange& scan_range,
-                                 const std::vector<OlapScanRange*>& key_ranges,
-                                 const std::vector<RowsetReaderSharedPtr>& 
rs_readers,
-                                 const std::vector<std::pair<int, int>>& 
rs_reader_seg_offsets) {
-        std::shared_ptr<NewOlapScanner> scanner = 
NewOlapScanner::create_shared(
-                _state, this, _limit_per_scanner, 
_olap_scan_node.is_preaggregation, scan_range,
-                key_ranges, rs_readers, rs_reader_seg_offsets, 
_scanner_profile.get());
-
-        RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
-        scanner->set_compound_filters(_compound_filters);
-        scanners->push_back(scanner);
-        return Status::OK();
-    };
     if (is_duplicate_key) {
-        // 2. Split by segment count, each scanner need scan avg segment count
-        auto avg_segment_count =
-                std::max(segment_count / 
config::doris_scanner_thread_pool_thread_num, 1);
+        auto build_new_scanner = [&](const TPaloScanRange& scan_range,
+                                     const std::vector<OlapScanRange*>& 
key_ranges,
+                                     const std::vector<RowSetSplits>& 
rs_splits) {
+            std::shared_ptr<NewOlapScanner> scanner = 
NewOlapScanner::create_shared(
+                    _state, this, _limit_per_scanner, 
_olap_scan_node.is_preaggregation, scan_range,
+                    key_ranges, rs_splits, _scanner_profile.get());
+
+            RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
+            scanner->set_compound_filters(_compound_filters);
+            scanners->push_back(scanner);
+            return Status::OK();
+        };
+        // 2. Split segment evenly to each scanner (e.g. each scanner need to 
scan `avg_segment_count_per_scanner` segments)
+        const auto avg_segment_count_by_scanner =
+                std::max(segment_count / 
config::doris_scanner_thread_pool_thread_num, (size_t)1);
         for (int i = 0; i < _scan_ranges.size(); ++i) {
             auto& scan_range = _scan_ranges[i];
             std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = 
&_cond_ranges;
@@ -528,69 +527,84 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
                 scanner_ranges[j] = (*ranges)[j].get();
             }
 
+            // Segment count in current rowset vector
             const auto& rs_seg_count = tablet_rs_seg_count[i];
-            int rs_seg_count_index = 0;
-            int rs_seg_start_scan = 0;
-            int scanner_seg_occupy = 0;
-            std::vector<RowsetReaderSharedPtr> rs_readers;
-            std::vector<std::pair<int, int>> rs_reader_seg_offsets;
 
-            while (rs_seg_count_index < rs_seg_count.size()) {
+            size_t rowset_idx = 0;
+            size_t segment_idx_to_scan = 0;
+            size_t num_segments_assigned = 0;
+
+            std::vector<RowSetSplits> rs_splits;
+
+            while (rowset_idx < rs_seg_count.size()) {
                 // do not generator range of segment (0, 0)
-                if (rs_seg_count[rs_seg_count_index] == 0) {
-                    rs_seg_start_scan = 0;
-                    rs_seg_count_index++;
+                if (rs_seg_count[rowset_idx] == 0) {
+                    segment_idx_to_scan = 0;
+                    rowset_idx++;
                     continue;
                 }
 
-                auto max_add_seg_nums = rs_seg_count[rs_seg_count_index] - 
rs_seg_start_scan;
-                
rs_readers.emplace_back(rowset_readers_vector[i][rs_seg_count_index]->clone());
-
-                if (scanner_seg_occupy + max_add_seg_nums > avg_segment_count) 
{
-                    auto need_add_seg_nums = avg_segment_count - 
scanner_seg_occupy;
-                    rs_reader_seg_offsets.emplace_back(
-                            rs_seg_start_scan,
-                            rs_seg_start_scan + need_add_seg_nums); // only 
scan need_add_seg_nums
-                    RETURN_IF_ERROR(build_new_scanner(*scan_range, 
scanner_ranges, rs_readers,
-                                                      rs_reader_seg_offsets));
-
-                    rs_seg_start_scan += need_add_seg_nums;
-                    scanner_seg_occupy = 0;
-                    rs_readers.clear();
-                    rs_reader_seg_offsets.clear();
-                } else if (scanner_seg_occupy + max_add_seg_nums == 
avg_segment_count) {
-                    rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
-                                                       
rs_seg_count[rs_seg_count_index]);
-                    RETURN_IF_ERROR(build_new_scanner(*scan_range, 
scanner_ranges, rs_readers,
-                                                      rs_reader_seg_offsets));
-
-                    rs_seg_start_scan = 0;
-                    scanner_seg_occupy = 0;
-                    rs_readers.clear();
-                    rs_reader_seg_offsets.clear();
-                    rs_seg_count_index++;
+                const auto max_add_seg_nums = rs_seg_count[rowset_idx] - 
segment_idx_to_scan;
+                rs_splits.emplace_back(RowSetSplits());
+                rs_splits.back().rs_reader = 
rowset_splits_vector[i][rowset_idx].rs_reader->clone();
+
+                // if segments assigned to current scanner are already more 
than the average count,
+                // this scanner will just scan segments equal to the average 
count
+                if (num_segments_assigned + max_add_seg_nums > 
avg_segment_count_by_scanner) {
+                    auto need_add_seg_nums = avg_segment_count_by_scanner - 
num_segments_assigned;
+                    rs_splits.back().segment_offsets = {
+                            segment_idx_to_scan,
+                            segment_idx_to_scan + need_add_seg_nums}; // only 
scan need_add_seg_nums
+
+                    RETURN_IF_ERROR(build_new_scanner(*scan_range, 
scanner_ranges, rs_splits));
+
+                    segment_idx_to_scan += need_add_seg_nums;
+                    num_segments_assigned = 0;
+                    rs_splits.clear();
+                } else if (num_segments_assigned + max_add_seg_nums ==
+                           avg_segment_count_by_scanner) {
+                    rs_splits.back().segment_offsets = {segment_idx_to_scan,
+                                                        
rs_seg_count[rowset_idx]};
+                    RETURN_IF_ERROR(build_new_scanner(*scan_range, 
scanner_ranges, rs_splits));
+
+                    segment_idx_to_scan = 0;
+                    num_segments_assigned = 0;
+                    rs_splits.clear();
+                    rowset_idx++;
                 } else {
-                    rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
-                                                       
rs_seg_count[rs_seg_count_index]);
+                    rs_splits.back().segment_offsets = {segment_idx_to_scan,
+                                                        
rs_seg_count[rowset_idx]};
 
-                    rs_seg_start_scan = 0;
-                    scanner_seg_occupy += max_add_seg_nums;
-                    rs_seg_count_index++;
+                    segment_idx_to_scan = 0;
+                    num_segments_assigned += max_add_seg_nums;
+                    rowset_idx++;
                 }
             }
 
 #ifndef NDEBUG
-            for (const auto& offset : rs_reader_seg_offsets) {
-                DCHECK_NE(offset.first, offset.second);
+            for (const auto& rs_reader_with_segments : rs_splits) {
+                DCHECK_NE(rs_reader_with_segments.segment_offsets.first,
+                          rs_reader_with_segments.segment_offsets.second);
             }
 #endif
 
             // dispose some segment tail
-            if (!rs_readers.empty()) {
-                build_new_scanner(*scan_range, scanner_ranges, rs_readers, 
rs_reader_seg_offsets);
+            if (!rs_splits.empty()) {
+                build_new_scanner(*scan_range, scanner_ranges, rs_splits);
             }
         }
     } else {
+        auto build_new_scanner = [&](const TPaloScanRange& scan_range,
+                                     const std::vector<OlapScanRange*>& 
key_ranges) {
+            std::shared_ptr<NewOlapScanner> scanner = 
NewOlapScanner::create_shared(
+                    _state, this, _limit_per_scanner, 
_olap_scan_node.is_preaggregation, scan_range,
+                    key_ranges, _scanner_profile.get());
+
+            RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
+            scanner->set_compound_filters(_compound_filters);
+            scanners->push_back(scanner);
+            return Status::OK();
+        };
         for (auto& scan_range : _scan_ranges) {
             auto tablet_id = scan_range->tablet_id;
             auto [tablet, status] =
@@ -619,7 +633,7 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
                      ++j, ++i) {
                     scanner_ranges.push_back((*ranges)[i].get());
                 }
-                RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, 
{}, {}));
+                RETURN_IF_ERROR(build_new_scanner(*scan_range, 
scanner_ranges));
             }
         }
     }
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index b945d42f6c..decf12ee91 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -63,17 +63,26 @@ namespace doris::vectorized {
 NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, 
int64_t limit,
                                bool aggregation, const TPaloScanRange& 
scan_range,
                                const std::vector<OlapScanRange*>& key_ranges,
-                               const std::vector<RowsetReaderSharedPtr>& 
rs_readers,
-                               const std::vector<std::pair<int, int>>& 
rs_reader_seg_offsets,
                                RuntimeProfile* profile)
         : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
           _aggregation(aggregation),
           _version(-1),
           _scan_range(scan_range),
           _key_ranges(key_ranges) {
-    DCHECK(rs_readers.size() == rs_reader_seg_offsets.size());
-    _tablet_reader_params.rs_readers = rs_readers;
-    _tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets;
+    _tablet_schema = std::make_shared<TabletSchema>();
+    _is_init = false;
+}
+
+NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, 
int64_t limit,
+                               bool aggregation, const TPaloScanRange& 
scan_range,
+                               const std::vector<OlapScanRange*>& key_ranges,
+                               const std::vector<RowSetSplits>& rs_splits, 
RuntimeProfile* profile)
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
+          _aggregation(aggregation),
+          _version(-1),
+          _scan_range(scan_range),
+          _key_ranges(key_ranges) {
+    _tablet_reader_params.rs_splits = rs_splits;
     _tablet_schema = std::make_shared<TabletSchema>();
     _is_init = false;
 }
@@ -167,7 +176,7 @@ Status NewOlapScanner::init() {
 
         {
             std::shared_lock rdlock(_tablet->get_header_lock());
-            if (_tablet_reader_params.rs_readers.empty()) {
+            if (_tablet_reader_params.rs_splits.empty()) {
                 const RowsetSharedPtr rowset = 
_tablet->rowset_with_max_version();
                 if (rowset == nullptr) {
                     std::stringstream ss;
@@ -181,7 +190,7 @@ Status NewOlapScanner::init() {
                 // the rowsets maybe compacted when the last olap scanner 
starts
                 Version rd_version(0, _version);
                 Status acquire_reader_st =
-                        _tablet->capture_rs_readers(rd_version, 
&_tablet_reader_params.rs_readers);
+                        _tablet->capture_rs_readers(rd_version, 
&_tablet_reader_params.rs_splits);
                 if (!acquire_reader_st.ok()) {
                     LOG(WARNING) << "fail to init reader.res=" << 
acquire_reader_st;
                     std::stringstream ss;
@@ -237,20 +246,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
         const FilterPredicates& filter_predicates,
         const std::vector<FunctionFilter>& function_filters) {
     // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
-    bool single_version =
-            (_tablet_reader_params.rs_readers.size() == 1 &&
-             _tablet_reader_params.rs_readers[0]->rowset()->start_version() == 
0 &&
-             !_tablet_reader_params.rs_readers[0]
-                      ->rowset()
-                      ->rowset_meta()
-                      ->is_segments_overlapping()) ||
-            (_tablet_reader_params.rs_readers.size() == 2 &&
-             
_tablet_reader_params.rs_readers[0]->rowset()->rowset_meta()->num_rows() == 0 &&
-             _tablet_reader_params.rs_readers[1]->rowset()->start_version() == 
2 &&
-             !_tablet_reader_params.rs_readers[1]
-                      ->rowset()
-                      ->rowset_meta()
-                      ->is_segments_overlapping());
+    const bool single_version = _tablet_reader_params.has_single_version();
     auto real_parent = reinterpret_cast<NewOlapScanNode*>(_parent);
     if (_state->skip_storage_engine_merge()) {
         _tablet_reader_params.direct_mode = true;
@@ -408,12 +404,13 @@ Status NewOlapScanner::_init_tablet_reader_params(
     // by rowset->update_delayed_expired_timestamp().This could expand the 
lifespan of Rowset
     if (_tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) {
         constexpr static int delayed_s = 60;
-        for (auto rs_reader : _tablet_reader_params.rs_readers) {
+        for (auto rs_reader : _tablet_reader_params.rs_splits) {
             uint64_t delayed_expired_timestamp =
                     UnixSeconds() + 
_tablet_reader_params.runtime_state->execution_timeout() +
                     delayed_s;
-            
rs_reader->rowset()->update_delayed_expired_timestamp(delayed_expired_timestamp);
-            StorageEngine::instance()->add_quering_rowset(rs_reader->rowset());
+            rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp(
+                    delayed_expired_timestamp);
+            
StorageEngine::instance()->add_quering_rowset(rs_reader.rs_reader->rowset());
         }
     }
 
@@ -452,10 +449,10 @@ Status NewOlapScanner::_init_return_columns() {
 
 doris::TabletStorageType NewOlapScanner::get_storage_type() {
     int local_reader = 0;
-    for (const auto& reader : _tablet_reader_params.rs_readers) {
-        local_reader += reader->rowset()->is_local();
+    for (const auto& reader : _tablet_reader_params.rs_splits) {
+        local_reader += reader.rs_reader->rowset()->is_local();
     }
-    int total_reader = _tablet_reader_params.rs_readers.size();
+    int total_reader = _tablet_reader_params.rs_splits.size();
 
     if (local_reader == total_reader) {
         return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
@@ -491,7 +488,7 @@ Status NewOlapScanner::close(RuntimeState* state) {
     // readers will be release when runtime state deconstructed but
     // deconstructor in reader references runtime state
     // so that it will core
-    _tablet_reader_params.rs_readers.clear();
+    _tablet_reader_params.rs_splits.clear();
     _tablet_reader.reset();
 
     RETURN_IF_ERROR(VScanner::close(state));
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index f44e55ee37..a411056097 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -55,10 +55,12 @@ class NewOlapScanner : public VScanner {
 public:
     NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t 
limit, bool aggregation,
                    const TPaloScanRange& scan_range, const 
std::vector<OlapScanRange*>& key_ranges,
-                   const std::vector<RowsetReaderSharedPtr>& rs_readers,
-                   const std::vector<std::pair<int, int>>& 
rs_reader_seg_offsets,
                    RuntimeProfile* profile);
 
+    NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t 
limit, bool aggregation,
+                   const TPaloScanRange& scan_range, const 
std::vector<OlapScanRange*>& key_ranges,
+                   const std::vector<RowSetSplits>& rs_splits, RuntimeProfile* 
profile);
+
     Status init() override;
 
     Status open(RuntimeState* state) override;
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index bf513d78b9..b37459974c 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -60,28 +60,51 @@ BlockReader::~BlockReader() {
     }
 }
 
-bool BlockReader::_rowsets_overlapping(const 
std::vector<RowsetReaderSharedPtr>& rs_readers) {
+bool BlockReader::_rowsets_overlapping(const ReaderParams& read_params) {
     std::string cur_max_key;
-    for (const auto& rs_reader : rs_readers) {
+    const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits;
+    for (const auto& rs_split : rs_splits) {
         // version 0-1 of every tablet is empty, just skip this rowset
-        if (rs_reader->rowset()->version().second == 1) {
+        if (rs_split.rs_reader->rowset()->version().second == 1) {
             continue;
         }
-        if (rs_reader->rowset()->num_rows() == 0) {
+        if (rs_split.rs_reader->rowset()->num_rows() == 0) {
             continue;
         }
-        if (rs_reader->rowset()->is_segments_overlapping()) {
+        if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
             return true;
         }
         std::string min_key;
-        bool has_min_key = rs_reader->rowset()->min_key(&min_key);
+        bool has_min_key = rs_split.rs_reader->rowset()->min_key(&min_key);
         if (!has_min_key) {
             return true;
         }
         if (min_key <= cur_max_key) {
             return true;
         }
-        CHECK(rs_reader->rowset()->max_key(&cur_max_key));
+        CHECK(rs_split.rs_reader->rowset()->max_key(&cur_max_key));
+    }
+
+    for (const auto& rs_reader : rs_splits) {
+        // version 0-1 of every tablet is empty, just skip this rowset
+        if (rs_reader.rs_reader->rowset()->version().second == 1) {
+            continue;
+        }
+        if (rs_reader.rs_reader->rowset()->num_rows() == 0) {
+            continue;
+        }
+        if (rs_reader.rs_reader->rowset()->is_segments_overlapping()) {
+            return true;
+        }
+        std::string min_key;
+        bool has_min_key = rs_reader.rs_reader->rowset()->min_key(&min_key);
+        if (!has_min_key) {
+            return true;
+        }
+        if (min_key <= cur_max_key) {
+            return true;
+        }
+        CHECK(rs_reader.rs_reader->rowset()->max_key(&cur_max_key));
     }
     return false;
 }
@@ -96,34 +119,28 @@ Status BlockReader::_init_collect_iter(const ReaderParams& 
read_params) {
         return res;
     }
     // check if rowsets are noneoverlapping
-    _is_rowsets_overlapping = _rowsets_overlapping(read_params.rs_readers);
+    _is_rowsets_overlapping = _rowsets_overlapping(read_params);
     _vcollect_iter.init(this, _is_rowsets_overlapping, 
read_params.read_orderby_key,
-                        read_params.read_orderby_key_reverse,
-                        read_params.rs_readers_segment_offsets);
+                        read_params.read_orderby_key_reverse);
 
     _reader_context.push_down_agg_type_opt = 
read_params.push_down_agg_type_opt;
     std::vector<RowsetReaderSharedPtr> valid_rs_readers;
-    DCHECK(read_params.rs_readers_segment_offsets.empty() ||
-           read_params.rs_readers_segment_offsets.size() == 
read_params.rs_readers.size());
 
-    bool is_empty = read_params.rs_readers_segment_offsets.empty();
-    for (int i = 0; i < read_params.rs_readers.size(); ++i) {
-        auto& rs_reader = read_params.rs_readers[i];
+    for (int i = 0; i < read_params.rs_splits.size(); ++i) {
+        auto& rs_split = read_params.rs_splits[i];
 
         // _vcollect_iter.topn_next() will init rs_reader by itself
         if (!_vcollect_iter.use_topn_next()) {
-            RETURN_IF_ERROR(rs_reader->init(
-                    &_reader_context,
-                    is_empty ? std::pair {0, 0} : 
read_params.rs_readers_segment_offsets[i]));
+            RETURN_IF_ERROR(rs_split.rs_reader->init(&_reader_context, 
rs_split));
         }
 
-        Status res = _vcollect_iter.add_child(rs_reader);
+        Status res = _vcollect_iter.add_child(rs_split);
         if (!res.ok() && !res.is<END_OF_FILE>()) {
             LOG(WARNING) << "failed to add child to iterator, err=" << res;
             return res;
         }
         if (res.ok()) {
-            valid_rs_readers.push_back(rs_reader);
+            valid_rs_readers.push_back(rs_split.rs_reader);
         }
     }
 
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index bf9e20def4..0fe188419e 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -90,7 +90,7 @@ private:
 
     bool _get_next_row_same();
 
-    bool _rowsets_overlapping(const std::vector<RowsetReaderSharedPtr>& 
rs_readers);
+    bool _rowsets_overlapping(const ReaderParams& read_params);
 
     VCollectIterator _vcollect_iter;
     IteratorRowRef _next_row {{}, -1, false};
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index bca58c34f8..78fad44fb7 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -65,8 +65,7 @@ VCollectIterator::~VCollectIterator() {
 }
 
 void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, 
bool force_merge,
-                            bool is_reverse,
-                            std::vector<std::pair<int, int>> 
rs_readers_segment_offsets) {
+                            bool is_reverse) {
     _reader = reader;
 
     // when aggregate is enabled or key_type is DUP_KEYS, we don't merge
@@ -92,21 +91,18 @@ void VCollectIterator::init(TabletReader* reader, bool 
ori_data_overlapping, boo
          (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
           _reader->_tablet->enable_unique_key_merge_on_write()))) {
         _topn_limit = _reader->_reader_context.read_orderby_key_limit;
-        // When we use scanner pooling + query with topn_with_limit, we need 
it because we initialize our rs_reader
-        // in out method but not upstream user. At time we init readers, we 
will need to use it.
-        _rs_readers_segment_offsets = rs_readers_segment_offsets;
     } else {
         _topn_limit = 0;
     }
 }
 
-Status VCollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
+Status VCollectIterator::add_child(const RowSetSplits& rs_splits) {
     if (use_topn_next()) {
-        _rs_readers.push_back(rs_reader);
+        _rs_splits.push_back(rs_splits);
         return Status::OK();
     }
 
-    std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_reader, 
_reader));
+    std::unique_ptr<LevelIterator> child(new 
Level0Iterator(rs_splits.rs_reader, _reader));
     _children.push_back(child.release());
     return Status::OK();
 }
@@ -288,23 +284,20 @@ Status VCollectIterator::_topn_next(Block* block) {
             row_pos_comparator);
 
     if (_is_reverse) {
-        std::reverse(_rs_readers.begin(), _rs_readers.end());
+        std::reverse(_rs_splits.begin(), _rs_splits.end());
     }
 
-    bool segment_empty = _rs_readers_segment_offsets.empty();
-    for (size_t i = 0; i < _rs_readers.size(); i++) {
-        const auto& rs_reader = _rs_readers[i];
+    for (size_t i = 0; i < _rs_splits.size(); i++) {
+        const auto& rs_split = _rs_splits[i];
         // init will prune segment by _reader_context.conditions and 
_reader_context.runtime_conditions
-        RETURN_IF_ERROR(
-                rs_reader->init(&_reader->_reader_context,
-                                segment_empty ? std::pair {0, 0} : 
_rs_readers_segment_offsets[i]));
+        RETURN_IF_ERROR(rs_split.rs_reader->init(&_reader->_reader_context, 
rs_split));
 
         // read _topn_limit rows from this rs
         size_t read_rows = 0;
         bool eof = false;
         while (read_rows < _topn_limit && !eof) {
             block->clear_column_data();
-            auto status = rs_reader->next_block(block);
+            auto status = rs_split.rs_reader->next_block(block);
             if (!status.ok()) {
                 if (status.is<END_OF_FILE>()) {
                     eof = true;
diff --git a/be/src/vec/olap/vcollect_iterator.h 
b/be/src/vec/olap/vcollect_iterator.h
index 449e1e05d2..7faa47ac2b 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -53,10 +53,9 @@ public:
     // Hold reader point to get reader params
     ~VCollectIterator();
 
-    void init(TabletReader* reader, bool ori_data_overlapping, bool 
force_merge, bool is_reverse,
-              std::vector<std::pair<int, int>> rs_readers_segment_offsets);
+    void init(TabletReader* reader, bool ori_data_overlapping, bool 
force_merge, bool is_reverse);
 
-    Status add_child(RowsetReaderSharedPtr rs_reader);
+    Status add_child(const RowSetSplits& rs_splits);
 
     Status build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers);
     // Get top row of the heap, nullptr if reach end.
@@ -330,9 +329,7 @@ private:
     // for topn next
     size_t _topn_limit = 0;
     bool _topn_eof = false;
-    // when we use scanner pooling + query with topn_with_limit, we use it.
-    std::vector<RowsetReaderSharedPtr> _rs_readers;
-    std::vector<std::pair<int, int>> _rs_readers_segment_offsets;
+    std::vector<RowSetSplits> _rs_splits;
 
     // Hold reader point to access read params, such as fetch conditions.
     TabletReader* _reader = nullptr;
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index 09757226c3..3023f28408 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -63,23 +63,23 @@ Status VerticalBlockReader::_get_segment_iterators(const 
ReaderParams& read_para
         return res;
     }
     _reader_context.is_vertical_compaction = true;
-    for (auto& rs_reader : read_params.rs_readers) {
+    for (auto& rs_split : read_params.rs_splits) {
         // segment iterator will be inited here
         // In vertical compaction, every group will load segment so we should 
cache
         // segment to avoid tot many s3 head request
-        RETURN_IF_ERROR(
-                rs_reader->get_segment_iterators(&_reader_context, 
segment_iters, {0, 0}, true));
+        
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_iterators(&_reader_context, 
segment_iters,
+                                                                  {}, true));
         // if segments overlapping, all segment iterator should be inited in
         // heap merge iterator. If segments are none overlapping, only first 
segment of this
         // rowset will be inited and push to heap, other segment will be 
inited later when current
         // segment reached it's end.
         // Use this iterator_init_flag so we can load few segments in 
HeapMergeIterator to save memory
-        if (rs_reader->rowset()->is_segments_overlapping()) {
-            for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) {
+        if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
+            for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments(); 
++i) {
                 iterator_init_flag->push_back(true);
             }
         } else {
-            for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) {
+            for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments(); 
++i) {
                 if (i == 0) {
                     iterator_init_flag->push_back(true);
                     continue;
@@ -87,10 +87,10 @@ Status VerticalBlockReader::_get_segment_iterators(const 
ReaderParams& read_para
                 iterator_init_flag->push_back(false);
             }
         }
-        for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) {
-            rowset_ids->push_back(rs_reader->rowset()->rowset_id());
+        for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments(); ++i) 
{
+            rowset_ids->push_back(rs_split.rs_reader->rowset()->rowset_id());
         }
-        rs_reader->reset_read_options();
+        rs_split.rs_reader->reset_read_options();
     }
     return Status::OK();
 }
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index 3ae55f9743..838db9b0d6 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -278,13 +278,13 @@ TEST_F(TestTablet, pad_rowset) {
     _tablet->init();
 
     Version version(5, 5);
-    std::vector<RowsetReaderSharedPtr> readers;
-    ASSERT_FALSE(_tablet->capture_rs_readers(version, &readers).ok());
-    readers.clear();
+    std::vector<RowSetSplits> splits;
+    ASSERT_FALSE(_tablet->capture_rs_readers(version, &splits).ok());
+    splits.clear();
 
     PadRowsetAction action(nullptr, TPrivilegeHier::GLOBAL, 
TPrivilegeType::ADMIN);
     action._pad_rowset(_tablet, version);
-    ASSERT_TRUE(_tablet->capture_rs_readers(version, &readers).ok());
+    ASSERT_TRUE(_tablet->capture_rs_readers(version, &splits).ok());
 }
 
 TEST_F(TestTablet, cooldown_policy) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to