This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 96a4159f7372bd6a1f867e582521b2aed0fb4e0f Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Tue May 28 13:08:45 2024 +0800 [opt](scan) Use lazy-init for segment iterators and avoid caching all segments in the rowset reader (#35432) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... --- be/src/olap/rowset/beta_rowset_reader.cpp | 67 +++++++++++++++------- be/src/olap/rowset/beta_rowset_reader.h | 4 +- .../segment_v2/lazy_init_segment_iterator.cpp | 38 ++++++++++++ .../rowset/segment_v2/lazy_init_segment_iterator.h | 67 ++++++++++++++++++++++ be/src/olap/rowset/segment_v2/segment.cpp | 29 +++++++--- be/src/olap/rowset/segment_v2/segment.h | 7 +++ be/src/vec/olap/vgeneric_iterators.cpp | 7 ++- 7 files changed, 185 insertions(+), 34 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 71729f0a230..e2ff07e349b 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -38,6 +38,7 @@ #include "olap/row_cursor.h" #include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_reader_context.h" +#include "olap/rowset/segment_v2/lazy_init_segment_iterator.h" #include "olap/rowset/segment_v2/segment.h" #include "olap/schema.h" #include "olap/schema_cache.h" @@ -240,38 +241,66 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context // load segments bool should_use_cache = use_cache || _read_context->reader_type == ReaderType::READER_QUERY; - RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, &_segment_cache_handle, + SegmentCacheHandle segment_cache_handle; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, &segment_cache_handle, should_use_cache)); // create iterator for each segment - auto& segments = _segment_cache_handle.get_segments(); + auto& segments = segment_cache_handle.get_segments(); + _segments_rows.resize(segments.size()); + for (size_t i = 0; i < segments.size(); i++) { + _segments_rows[i] = segments[i]->num_rows(); + } + auto [seg_start, seg_end] = _segment_offsets; if (seg_start == seg_end) { seg_start = 0; seg_end = segments.size(); } + const bool is_merge_iterator = _is_merge_iterator(); + const bool use_lazy_init_iterators = + !is_merge_iterator && _read_context->reader_type == ReaderType::READER_QUERY; for (int i = seg_start; i < seg_end; i++) { auto& seg_ptr = segments[i]; std::unique_ptr<RowwiseIterator> iter; - Status status; - /// If `_segment_row_ranges` is empty, the segment is not split. - if (_segment_row_ranges.empty()) { - _read_options.row_ranges.clear(); - status = seg_ptr->new_iterator(_input_schema, _read_options, &iter); + if (use_lazy_init_iterators) { + /// For non-merging iterators, we don't need to initialize them all at once when creating them. + /// Instead, we should initialize each iterator separately when really using them. + /// This optimization minimizes the lifecycle of resources like column readers + /// and prevents excessive memory consumption, especially for wide tables. + if (_segment_row_ranges.empty()) { + _read_options.row_ranges.clear(); + iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, _input_schema, + _read_options); + } else { + DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size()); + auto local_options = _read_options; + local_options.row_ranges = _segment_row_ranges[i - seg_start]; + iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, _input_schema, + local_options); + } } else { - DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size()); - auto local_options = _read_options; - local_options.row_ranges = _segment_row_ranges[i - seg_start]; - status = seg_ptr->new_iterator(_input_schema, local_options, &iter); - } + Status status; + /// If `_segment_row_ranges` is empty, the segment is not split. + if (_segment_row_ranges.empty()) { + _read_options.row_ranges.clear(); + status = seg_ptr->new_iterator(_input_schema, _read_options, &iter); + } else { + DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size()); + auto local_options = _read_options; + local_options.row_ranges = _segment_row_ranges[i - seg_start]; + status = seg_ptr->new_iterator(_input_schema, local_options, &iter); + } - if (!status.ok()) { - LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() - << "]: " << status.to_string(); - return Status::Error<ROWSET_READER_INIT>(status.to_string()); + if (!status.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() + << "]: " << status.to_string(); + return Status::Error<ROWSET_READER_INIT>(status.to_string()); + } } + if (iter->empty()) { continue; } @@ -379,11 +408,7 @@ bool BetaRowsetReader::_should_push_down_value_predicates() const { } Status BetaRowsetReader::get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) { - auto& seg_ptrs = _segment_cache_handle.get_segments(); - segment_num_rows->resize(seg_ptrs.size()); - for (size_t i = 0; i < seg_ptrs.size(); i++) { - (*segment_num_rows)[i] = seg_ptrs[i]->num_rows(); - } + segment_num_rows->assign(_segments_rows.cbegin(), _segments_rows.cend()); return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index 6d0c4034c87..33b2fb6a58c 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -120,9 +120,7 @@ private: std::unique_ptr<RowwiseIterator> _iterator; - // make sure this handle is initialized and valid before - // reading data. - SegmentCacheHandle _segment_cache_handle; + std::vector<uint32_t> _segments_rows; StorageReadOptions _read_options; diff --git a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp new file mode 100644 index 00000000000..d70df5a7bae --- /dev/null +++ b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/lazy_init_segment_iterator.h" + +namespace doris::segment_v2 { + +LazyInitSegmentIterator::LazyInitSegmentIterator(std::shared_ptr<Segment> segment, + SchemaSPtr schema, const StorageReadOptions& opts) + : _schema(std::move(schema)), _segment(std::move(segment)), _read_options(opts) {} + +/// Here do not use the argument of `opts`, +/// see where the iterator is created in `BetaRowsetReader::get_segment_iterators` +Status LazyInitSegmentIterator::init(const StorageReadOptions& /*opts*/) { + _need_lazy_init = false; + if (_inner_iterator) { + return Status::OK(); + } + + RETURN_IF_ERROR(_segment->new_iterator(_schema, _read_options, &_inner_iterator)); + return _inner_iterator->init(_read_options); +} + +} // namespace doris::segment_v2 \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h new file mode 100644 index 00000000000..923c540c456 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/rowset/segment_v2/common.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_iterator.h" +#include "vec/core/block.h" + +namespace doris::segment_v2 { + +using namespace vectorized; + +class LazyInitSegmentIterator : public RowwiseIterator { +public: + LazyInitSegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema, + const StorageReadOptions& opts); + + ~LazyInitSegmentIterator() override = default; + + Status init(const StorageReadOptions& opts) override; + + Status next_batch(Block* block) override { + if (UNLIKELY(_need_lazy_init)) { + RETURN_IF_ERROR(init(_read_options)); + DCHECK(_inner_iterator != nullptr); + } + + return _inner_iterator->next_batch(block); + } + + const Schema& schema() const override { return *_schema; } + + Status current_block_row_locations(std::vector<RowLocation>* locations) override { + return _inner_iterator->current_block_row_locations(locations); + } + + bool update_profile(RuntimeProfile* profile) override { + if (_inner_iterator != nullptr) { + return _inner_iterator->update_profile(profile); + } + return false; + } + +private: + bool _need_lazy_init {true}; + SchemaSPtr _schema = nullptr; + std::shared_ptr<Segment> _segment; + StorageReadOptions _read_options; + RowwiseIteratorUPtr _inner_iterator; +}; +} // namespace doris::segment_v2 \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 7893028a397..c18eec3ad6e 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -104,16 +104,15 @@ Segment::~Segment() { } Status Segment::_open() { - SegmentFooterPB footer; - RETURN_IF_ERROR(_parse_footer(&footer)); - RETURN_IF_ERROR(_create_column_readers(footer)); - _pk_index_meta.reset(footer.has_primary_key_index_meta() - ? new PrimaryKeyIndexMetaPB(footer.primary_key_index_meta()) + _footer_pb = std::make_unique<SegmentFooterPB>(); + RETURN_IF_ERROR(_parse_footer(_footer_pb.get())); + _pk_index_meta.reset(_footer_pb->has_primary_key_index_meta() + ? new PrimaryKeyIndexMetaPB(_footer_pb->primary_key_index_meta()) : nullptr); // delete_bitmap_calculator_test.cpp // DCHECK(footer.has_short_key_index_page()); - _sk_index_page = footer.short_key_index_page(); - _num_rows = footer.num_rows(); + _sk_index_page = _footer_pb->short_key_index_page(); + _num_rows = _footer_pb->num_rows(); return Status::OK(); } @@ -134,6 +133,8 @@ Status Segment::_open_inverted_index() { Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_options, std::unique_ptr<RowwiseIterator>* iter) { + RETURN_IF_ERROR(_create_column_readers_once()); + read_options.stats->total_segment_number++; // trying to prune the current segment by segment-level zone map for (auto& entry : read_options.col_id_to_predicates) { @@ -387,6 +388,15 @@ vectorized::DataTypePtr Segment::get_data_type_of(vectorized::PathInDataPtr path // TODO support normal column type return nullptr; } + +Status Segment::_create_column_readers_once() { + return _create_column_readers_once_call.call([&] { + DCHECK(_footer_pb); + Defer defer([&]() { _footer_pb.reset(); }); + return _create_column_readers(*_footer_pb); + }); +} + Status Segment::_create_column_readers(const SegmentFooterPB& footer) { std::unordered_map<uint32_t, uint32_t> column_id_to_footer_ordinal; std::unordered_map<vectorized::PathInData, uint32_t, vectorized::PathInData::Hash> @@ -589,6 +599,8 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, Status Segment::new_column_iterator(const TabletColumn& tablet_column, std::unique_ptr<ColumnIterator>* iter, const StorageReadOptions* opt) { + RETURN_IF_ERROR(_create_column_readers_once()); + // init column iterator by path info if (tablet_column.has_path_info() || tablet_column.is_variant_type()) { return new_column_iterator_with_path(tablet_column, iter, opt); @@ -616,6 +628,7 @@ Status Segment::new_column_iterator(const TabletColumn& tablet_column, } Status Segment::new_column_iterator(int32_t unique_id, std::unique_ptr<ColumnIterator>* iter) { + RETURN_IF_ERROR(_create_column_readers_once()); ColumnIterator* it; RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it)); iter->reset(it); @@ -641,6 +654,7 @@ ColumnReader* Segment::_get_column_reader(const TabletColumn& col) { Status Segment::new_bitmap_index_iterator(const TabletColumn& tablet_column, std::unique_ptr<BitmapIndexIterator>* iter) { + RETURN_IF_ERROR(_create_column_readers_once()); ColumnReader* reader = _get_column_reader(tablet_column); if (reader != nullptr && reader->has_bitmap_index()) { BitmapIndexIterator* it; @@ -655,6 +669,7 @@ Status Segment::new_inverted_index_iterator(const TabletColumn& tablet_column, const TabletIndex* index_meta, const StorageReadOptions& read_options, std::unique_ptr<InvertedIndexIterator>* iter) { + RETURN_IF_ERROR(_create_column_readers_once()); ColumnReader* reader = _get_column_reader(tablet_column); if (reader != nullptr && index_meta) { if (_inverted_index_file_reader == nullptr) { diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index e97a0389e21..269d5f86364 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -206,6 +206,8 @@ private: Status _load_index_impl(); Status _open_inverted_index(); + Status _create_column_readers_once(); + private: friend class SegmentIterator; io::FileReaderSPtr _file_reader; @@ -246,6 +248,11 @@ private: DorisCallOnce<Status> _load_index_once; // used to guarantee that primary key bloom filter will be loaded at most once in a thread-safe way DorisCallOnce<Status> _load_pk_bf_once; + + DorisCallOnce<Status> _create_column_readers_once_call; + + std::unique_ptr<SegmentFooterPB> _footer_pb; + // used to hold short key index page in memory PageHandle _sk_index_handle; // short key index decoder diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 4e3df66cd0f..d8a073fc11a 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -379,6 +379,7 @@ public: private: const Schema* _schema = nullptr; RowwiseIteratorUPtr _cur_iter = nullptr; + StorageReadOptions _read_options; std::vector<RowwiseIteratorUPtr> _origin_iters; }; @@ -392,10 +393,9 @@ Status VUnionIterator::init(const StorageReadOptions& opts) { // in the same order as the original segments. std::reverse(_origin_iters.begin(), _origin_iters.end()); - for (auto& iter : _origin_iters) { - RETURN_IF_ERROR(iter->init(opts)); - } + _read_options = opts; _cur_iter = std::move(_origin_iters.back()); + RETURN_IF_ERROR(_cur_iter->init(_read_options)); _schema = &_cur_iter->schema(); return Status::OK(); } @@ -407,6 +407,7 @@ Status VUnionIterator::next_batch(Block* block) { _origin_iters.pop_back(); if (!_origin_iters.empty()) { _cur_iter = std::move(_origin_iters.back()); + RETURN_IF_ERROR(_cur_iter->init(_read_options)); } else { _cur_iter = nullptr; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org