This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 96a4159f7372bd6a1f867e582521b2aed0fb4e0f
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Tue May 28 13:08:45 2024 +0800

    [opt](scan) Use lazy-init for segment iterators and avoid caching all 
segments in the rowset reader (#35432)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ## Further comments
    
    If this is a relatively large or complex change, kick off the discussion
    at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why
    you chose the solution you did and what alternatives you considered,
    etc...
---
 be/src/olap/rowset/beta_rowset_reader.cpp          | 67 +++++++++++++++-------
 be/src/olap/rowset/beta_rowset_reader.h            |  4 +-
 .../segment_v2/lazy_init_segment_iterator.cpp      | 38 ++++++++++++
 .../rowset/segment_v2/lazy_init_segment_iterator.h | 67 ++++++++++++++++++++++
 be/src/olap/rowset/segment_v2/segment.cpp          | 29 +++++++---
 be/src/olap/rowset/segment_v2/segment.h            |  7 +++
 be/src/vec/olap/vgeneric_iterators.cpp             |  7 ++-
 7 files changed, 185 insertions(+), 34 deletions(-)

diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 71729f0a230..e2ff07e349b 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -38,6 +38,7 @@
 #include "olap/row_cursor.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/rowset/rowset_reader_context.h"
+#include "olap/rowset/segment_v2/lazy_init_segment_iterator.h"
 #include "olap/rowset/segment_v2/segment.h"
 #include "olap/schema.h"
 #include "olap/schema_cache.h"
@@ -240,38 +241,66 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
 
     // load segments
     bool should_use_cache = use_cache || _read_context->reader_type == 
ReaderType::READER_QUERY;
-    RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, 
&_segment_cache_handle,
+    SegmentCacheHandle segment_cache_handle;
+    RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, 
&segment_cache_handle,
                                                              
should_use_cache));
 
     // create iterator for each segment
-    auto& segments = _segment_cache_handle.get_segments();
+    auto& segments = segment_cache_handle.get_segments();
+    _segments_rows.resize(segments.size());
+    for (size_t i = 0; i < segments.size(); i++) {
+        _segments_rows[i] = segments[i]->num_rows();
+    }
+
     auto [seg_start, seg_end] = _segment_offsets;
     if (seg_start == seg_end) {
         seg_start = 0;
         seg_end = segments.size();
     }
 
+    const bool is_merge_iterator = _is_merge_iterator();
+    const bool use_lazy_init_iterators =
+            !is_merge_iterator && _read_context->reader_type == 
ReaderType::READER_QUERY;
     for (int i = seg_start; i < seg_end; i++) {
         auto& seg_ptr = segments[i];
         std::unique_ptr<RowwiseIterator> iter;
-        Status status;
 
-        /// If `_segment_row_ranges` is empty, the segment is not split.
-        if (_segment_row_ranges.empty()) {
-            _read_options.row_ranges.clear();
-            status = seg_ptr->new_iterator(_input_schema, _read_options, 
&iter);
+        if (use_lazy_init_iterators) {
+            /// For non-merging iterators, we don't need to initialize them 
all at once when creating them.
+            /// Instead, we should initialize each iterator separately when 
really using them.
+            /// This optimization minimizes the lifecycle of resources like 
column readers
+            /// and prevents excessive memory consumption, especially for wide 
tables.
+            if (_segment_row_ranges.empty()) {
+                _read_options.row_ranges.clear();
+                iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, 
_input_schema,
+                                                                 
_read_options);
+            } else {
+                DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
+                auto local_options = _read_options;
+                local_options.row_ranges = _segment_row_ranges[i - seg_start];
+                iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, 
_input_schema,
+                                                                 
local_options);
+            }
         } else {
-            DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
-            auto local_options = _read_options;
-            local_options.row_ranges = _segment_row_ranges[i - seg_start];
-            status = seg_ptr->new_iterator(_input_schema, local_options, 
&iter);
-        }
+            Status status;
+            /// If `_segment_row_ranges` is empty, the segment is not split.
+            if (_segment_row_ranges.empty()) {
+                _read_options.row_ranges.clear();
+                status = seg_ptr->new_iterator(_input_schema, _read_options, 
&iter);
+            } else {
+                DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
+                auto local_options = _read_options;
+                local_options.row_ranges = _segment_row_ranges[i - seg_start];
+                status = seg_ptr->new_iterator(_input_schema, local_options, 
&iter);
+            }
 
-        if (!status.ok()) {
-            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
-                         << "]: " << status.to_string();
-            return Status::Error<ROWSET_READER_INIT>(status.to_string());
+            if (!status.ok()) {
+                LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
+                             << "]: " << status.to_string();
+                return Status::Error<ROWSET_READER_INIT>(status.to_string());
+            }
         }
+
         if (iter->empty()) {
             continue;
         }
@@ -379,11 +408,7 @@ bool 
BetaRowsetReader::_should_push_down_value_predicates() const {
 }
 
 Status BetaRowsetReader::get_segment_num_rows(std::vector<uint32_t>* 
segment_num_rows) {
-    auto& seg_ptrs = _segment_cache_handle.get_segments();
-    segment_num_rows->resize(seg_ptrs.size());
-    for (size_t i = 0; i < seg_ptrs.size(); i++) {
-        (*segment_num_rows)[i] = seg_ptrs[i]->num_rows();
-    }
+    segment_num_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
     return Status::OK();
 }
 
diff --git a/be/src/olap/rowset/beta_rowset_reader.h 
b/be/src/olap/rowset/beta_rowset_reader.h
index 6d0c4034c87..33b2fb6a58c 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -120,9 +120,7 @@ private:
 
     std::unique_ptr<RowwiseIterator> _iterator;
 
-    // make sure this handle is initialized and valid before
-    // reading data.
-    SegmentCacheHandle _segment_cache_handle;
+    std::vector<uint32_t> _segments_rows;
 
     StorageReadOptions _read_options;
 
diff --git a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
new file mode 100644
index 00000000000..d70df5a7bae
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/lazy_init_segment_iterator.h"
+
+namespace doris::segment_v2 {
+
+LazyInitSegmentIterator::LazyInitSegmentIterator(std::shared_ptr<Segment> 
segment,
+                                                 SchemaSPtr schema, const 
StorageReadOptions& opts)
+        : _schema(std::move(schema)), _segment(std::move(segment)), 
_read_options(opts) {}
+
+/// Here do not use the argument of `opts`,
+/// see where the iterator is created in 
`BetaRowsetReader::get_segment_iterators`
+Status LazyInitSegmentIterator::init(const StorageReadOptions& /*opts*/) {
+    _need_lazy_init = false;
+    if (_inner_iterator) {
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(_segment->new_iterator(_schema, _read_options, 
&_inner_iterator));
+    return _inner_iterator->init(_read_options);
+}
+
+} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h 
b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
new file mode 100644
index 00000000000..923c540c456
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/rowset/segment_v2/common.h"
+#include "olap/rowset/segment_v2/segment.h"
+#include "olap/rowset/segment_v2/segment_iterator.h"
+#include "vec/core/block.h"
+
+namespace doris::segment_v2 {
+
+using namespace vectorized;
+
+class LazyInitSegmentIterator : public RowwiseIterator {
+public:
+    LazyInitSegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr 
schema,
+                            const StorageReadOptions& opts);
+
+    ~LazyInitSegmentIterator() override = default;
+
+    Status init(const StorageReadOptions& opts) override;
+
+    Status next_batch(Block* block) override {
+        if (UNLIKELY(_need_lazy_init)) {
+            RETURN_IF_ERROR(init(_read_options));
+            DCHECK(_inner_iterator != nullptr);
+        }
+
+        return _inner_iterator->next_batch(block);
+    }
+
+    const Schema& schema() const override { return *_schema; }
+
+    Status current_block_row_locations(std::vector<RowLocation>* locations) 
override {
+        return _inner_iterator->current_block_row_locations(locations);
+    }
+
+    bool update_profile(RuntimeProfile* profile) override {
+        if (_inner_iterator != nullptr) {
+            return _inner_iterator->update_profile(profile);
+        }
+        return false;
+    }
+
+private:
+    bool _need_lazy_init {true};
+    SchemaSPtr _schema = nullptr;
+    std::shared_ptr<Segment> _segment;
+    StorageReadOptions _read_options;
+    RowwiseIteratorUPtr _inner_iterator;
+};
+} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index 7893028a397..c18eec3ad6e 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -104,16 +104,15 @@ Segment::~Segment() {
 }
 
 Status Segment::_open() {
-    SegmentFooterPB footer;
-    RETURN_IF_ERROR(_parse_footer(&footer));
-    RETURN_IF_ERROR(_create_column_readers(footer));
-    _pk_index_meta.reset(footer.has_primary_key_index_meta()
-                                 ? new 
PrimaryKeyIndexMetaPB(footer.primary_key_index_meta())
+    _footer_pb = std::make_unique<SegmentFooterPB>();
+    RETURN_IF_ERROR(_parse_footer(_footer_pb.get()));
+    _pk_index_meta.reset(_footer_pb->has_primary_key_index_meta()
+                                 ? new 
PrimaryKeyIndexMetaPB(_footer_pb->primary_key_index_meta())
                                  : nullptr);
     // delete_bitmap_calculator_test.cpp
     // DCHECK(footer.has_short_key_index_page());
-    _sk_index_page = footer.short_key_index_page();
-    _num_rows = footer.num_rows();
+    _sk_index_page = _footer_pb->short_key_index_page();
+    _num_rows = _footer_pb->num_rows();
     return Status::OK();
 }
 
@@ -134,6 +133,8 @@ Status Segment::_open_inverted_index() {
 
 Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& 
read_options,
                              std::unique_ptr<RowwiseIterator>* iter) {
+    RETURN_IF_ERROR(_create_column_readers_once());
+
     read_options.stats->total_segment_number++;
     // trying to prune the current segment by segment-level zone map
     for (auto& entry : read_options.col_id_to_predicates) {
@@ -387,6 +388,15 @@ vectorized::DataTypePtr 
Segment::get_data_type_of(vectorized::PathInDataPtr path
     // TODO support normal column type
     return nullptr;
 }
+
+Status Segment::_create_column_readers_once() {
+    return _create_column_readers_once_call.call([&] {
+        DCHECK(_footer_pb);
+        Defer defer([&]() { _footer_pb.reset(); });
+        return _create_column_readers(*_footer_pb);
+    });
+}
+
 Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
     std::unordered_map<uint32_t, uint32_t> column_id_to_footer_ordinal;
     std::unordered_map<vectorized::PathInData, uint32_t, 
vectorized::PathInData::Hash>
@@ -589,6 +599,8 @@ Status Segment::new_column_iterator_with_path(const 
TabletColumn& tablet_column,
 Status Segment::new_column_iterator(const TabletColumn& tablet_column,
                                     std::unique_ptr<ColumnIterator>* iter,
                                     const StorageReadOptions* opt) {
+    RETURN_IF_ERROR(_create_column_readers_once());
+
     // init column iterator by path info
     if (tablet_column.has_path_info() || tablet_column.is_variant_type()) {
         return new_column_iterator_with_path(tablet_column, iter, opt);
@@ -616,6 +628,7 @@ Status Segment::new_column_iterator(const TabletColumn& 
tablet_column,
 }
 
 Status Segment::new_column_iterator(int32_t unique_id, 
std::unique_ptr<ColumnIterator>* iter) {
+    RETURN_IF_ERROR(_create_column_readers_once());
     ColumnIterator* it;
     RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it));
     iter->reset(it);
@@ -641,6 +654,7 @@ ColumnReader* Segment::_get_column_reader(const 
TabletColumn& col) {
 
 Status Segment::new_bitmap_index_iterator(const TabletColumn& tablet_column,
                                           
std::unique_ptr<BitmapIndexIterator>* iter) {
+    RETURN_IF_ERROR(_create_column_readers_once());
     ColumnReader* reader = _get_column_reader(tablet_column);
     if (reader != nullptr && reader->has_bitmap_index()) {
         BitmapIndexIterator* it;
@@ -655,6 +669,7 @@ Status Segment::new_inverted_index_iterator(const 
TabletColumn& tablet_column,
                                             const TabletIndex* index_meta,
                                             const StorageReadOptions& 
read_options,
                                             
std::unique_ptr<InvertedIndexIterator>* iter) {
+    RETURN_IF_ERROR(_create_column_readers_once());
     ColumnReader* reader = _get_column_reader(tablet_column);
     if (reader != nullptr && index_meta) {
         if (_inverted_index_file_reader == nullptr) {
diff --git a/be/src/olap/rowset/segment_v2/segment.h 
b/be/src/olap/rowset/segment_v2/segment.h
index e97a0389e21..269d5f86364 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -206,6 +206,8 @@ private:
     Status _load_index_impl();
     Status _open_inverted_index();
 
+    Status _create_column_readers_once();
+
 private:
     friend class SegmentIterator;
     io::FileReaderSPtr _file_reader;
@@ -246,6 +248,11 @@ private:
     DorisCallOnce<Status> _load_index_once;
     // used to guarantee that primary key bloom filter will be loaded at most 
once in a thread-safe way
     DorisCallOnce<Status> _load_pk_bf_once;
+
+    DorisCallOnce<Status> _create_column_readers_once_call;
+
+    std::unique_ptr<SegmentFooterPB> _footer_pb;
+
     // used to hold short key index page in memory
     PageHandle _sk_index_handle;
     // short key index decoder
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp 
b/be/src/vec/olap/vgeneric_iterators.cpp
index 4e3df66cd0f..d8a073fc11a 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -379,6 +379,7 @@ public:
 private:
     const Schema* _schema = nullptr;
     RowwiseIteratorUPtr _cur_iter = nullptr;
+    StorageReadOptions _read_options;
     std::vector<RowwiseIteratorUPtr> _origin_iters;
 };
 
@@ -392,10 +393,9 @@ Status VUnionIterator::init(const StorageReadOptions& 
opts) {
     // in the same order as the original segments.
     std::reverse(_origin_iters.begin(), _origin_iters.end());
 
-    for (auto& iter : _origin_iters) {
-        RETURN_IF_ERROR(iter->init(opts));
-    }
+    _read_options = opts;
     _cur_iter = std::move(_origin_iters.back());
+    RETURN_IF_ERROR(_cur_iter->init(_read_options));
     _schema = &_cur_iter->schema();
     return Status::OK();
 }
@@ -407,6 +407,7 @@ Status VUnionIterator::next_batch(Block* block) {
             _origin_iters.pop_back();
             if (!_origin_iters.empty()) {
                 _cur_iter = std::move(_origin_iters.back());
+                RETURN_IF_ERROR(_cur_iter->init(_read_options));
             } else {
                 _cur_iter = nullptr;
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to