This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0-tmp-load-seg-memory
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0-tmp-load-seg-memory 
by this push:
     new 9347d3cd62f [improvement](segment) reduce memory usage when open 
segments (#46570)
9347d3cd62f is described below

commit 9347d3cd62fa5b608d4d9179ac31d7b4e48f4df6
Author: TengJianPing <tengjianp...@selectdb.com>
AuthorDate: Thu Jan 9 15:00:37 2025 +0800

    [improvement](segment) reduce memory usage when open segments (#46570)
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    When there are a lot of segments in one rowset, it will consume plenty
    of memory if open all the segments all at once. This PR open segments
    one by one and release the `Segment` object immediately if it's not need
    to be kept for later use, thus reduce memory footprints dramatically.
---
 be/src/olap/parallel_scanner_builder.cpp           | 18 ++---
 be/src/olap/rowset/beta_rowset.cpp                 | 21 ++++-
 be/src/olap/rowset/beta_rowset.h                   |  7 +-
 be/src/olap/rowset/beta_rowset_reader.cpp          | 90 ++++++----------------
 be/src/olap/rowset/beta_rowset_reader.h            |  8 +-
 be/src/olap/rowset/rowset.cpp                      |  2 -
 be/src/olap/rowset/rowset.h                        |  3 -
 be/src/olap/rowset/rowset_reader.h                 |  6 +-
 be/src/olap/rowset/rowset_reader_context.h         |  3 +-
 .../segment_v2/lazy_init_segment_iterator.cpp      | 23 +++++-
 .../rowset/segment_v2/lazy_init_segment_iterator.h | 12 ++-
 be/src/olap/segment_loader.cpp                     | 58 ++++++++------
 be/src/olap/segment_loader.h                       |  7 ++
 be/test/olap/ordered_data_compaction_test.cpp      |  2 -
 be/test/olap/rowid_conversion_test.cpp             |  3 +-
 be/test/olap/segcompaction_mow_test.cpp            |  9 +--
 be/test/olap/segcompaction_test.cpp                | 18 ++---
 be/test/testutil/mock_rowset.h                     |  4 -
 be/test/vec/olap/vertical_compaction_test.cpp      | 12 ---
 19 files changed, 141 insertions(+), 165 deletions(-)

diff --git a/be/src/olap/parallel_scanner_builder.cpp 
b/be/src/olap/parallel_scanner_builder.cpp
index 88c69ab5c9a..769abe4a946 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -70,7 +70,7 @@ Status 
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
                 continue;
             }
 
-            int segment_start = 0;
+            int64_t segment_start = 0;
             auto split = RowSetSplits(reader->clone());
 
             for (size_t i = 0; i != segments_rows.size(); ++i) {
@@ -171,22 +171,18 @@ Status ParallelScannerBuilder::_load() {
         if (!_state->skip_delete_predicate()) {
             read_source.fill_delete_predicates();
         }
-        bool enable_segment_cache = 
_state->query_options().__isset.enable_segment_cache
-                                            ? 
_state->query_options().enable_segment_cache
-                                            : true;
 
         for (auto& rs_split : read_source.rs_splits) {
             auto rowset = rs_split.rs_reader->rowset();
             RETURN_IF_ERROR(rowset->load());
             const auto rowset_id = rowset->rowset_id();
-            SegmentCacheHandle segment_cache_handle;
 
-            RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
-                    std::dynamic_pointer_cast<BetaRowset>(rowset), 
&segment_cache_handle,
-                    enable_segment_cache, false));
-
-            for (const auto& segment : segment_cache_handle.get_segments()) {
-                
_all_segments_rows[rowset_id].emplace_back(segment->num_rows());
+            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+            std::vector<uint32_t> segment_rows;
+            RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows));
+            auto segment_count = rowset->num_segments();
+            for (int64_t i = 0; i != segment_count; i++) {
+                _all_segments_rows[rowset_id].emplace_back(segment_rows[i]);
             }
             _total_rows += rowset->num_rows();
         }
diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index 9f33f363e99..40db8226f50 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -42,6 +42,7 @@
 #include "olap/rowset/segment_v2/inverted_index_cache.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/rowset/segment_v2/inverted_index_file_reader.h"
+#include "olap/segment_loader.h"
 #include "olap/tablet_schema.h"
 #include "olap/utils.h"
 #include "util/crc32c.h"
@@ -68,9 +69,23 @@ Status BetaRowset::init() {
     return Status::OK(); // no op
 }
 
-Status BetaRowset::do_load(bool /*use_cache*/) {
-    // do nothing.
-    // the segments in this rowset will be loaded by calling load_segments() 
explicitly.
+Status BetaRowset::get_segment_num_rows(std::vector<uint32_t>* segment_rows) {
+    DCHECK(_rowset_state_machine.rowset_state() == ROWSET_LOADED);
+
+    RETURN_IF_ERROR(_load_segment_rows_once.call([this] {
+        auto segment_count = num_segments();
+        _segments_rows.resize(segment_count);
+        for (int64_t i = 0; i != segment_count; ++i) {
+            SegmentCacheHandle segment_cache_handle;
+            RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
+                    std::static_pointer_cast<BetaRowset>(shared_from_this()), 
i,
+                    &segment_cache_handle, false, false));
+            const auto& tmp_segments = segment_cache_handle.get_segments();
+            _segments_rows[i] = tmp_segments[0]->num_rows();
+        }
+        return Status::OK();
+    }));
+    segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
     return Status::OK();
 }
 
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 0b22d122741..4b1388a1f08 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -89,6 +89,8 @@ public:
     Status show_nested_index_file(rapidjson::Value* rowset_value,
                                   rapidjson::Document::AllocatorType& 
allocator);
 
+    Status get_segment_num_rows(std::vector<uint32_t>* segment_rows);
+
 protected:
     BetaRowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& 
rowset_meta,
                std::string tablet_path);
@@ -96,8 +98,6 @@ protected:
     // init segment groups
     Status init() override;
 
-    Status do_load(bool use_cache) override;
-
     void do_close() override;
 
     Status check_current_rowset_segment() override;
@@ -107,6 +107,9 @@ protected:
 private:
     friend class RowsetFactory;
     friend class BetaRowsetReader;
+
+    DorisCallOnce<Status> _load_segment_rows_once;
+    std::vector<uint32_t> _segments_rows;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index d690b9b58d5..4ff7169e055 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -215,7 +215,6 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
         _read_options.io_ctx.expiration_time = 0;
     }
 
-    // load segments
     bool enable_segment_cache = true;
     auto* state = read_context->runtime_state;
     if (state != nullptr) {
@@ -226,75 +225,40 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
     // When reader type is for query, session variable `enable_segment_cache` 
should be respected.
     bool should_use_cache = use_cache || (_read_context->reader_type == 
ReaderType::READER_QUERY &&
                                           enable_segment_cache);
-    SegmentCacheHandle segment_cache_handle;
-    {
-        SCOPED_RAW_TIMER(&_stats->rowset_reader_load_segments_timer_ns);
-        RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
-                _rowset, &segment_cache_handle, should_use_cache,
-                /*need_load_pk_index_and_bf*/ false));
-    }
-
-    // create iterator for each segment
-    auto& segments = segment_cache_handle.get_segments();
-    _segments_rows.resize(segments.size());
-    for (size_t i = 0; i < segments.size(); i++) {
-        _segments_rows[i] = segments[i]->num_rows();
-    }
-    if (_read_context->record_rowids) {
-        // init segment rowid map for rowid conversion
-        std::vector<uint32_t> segment_num_rows;
-        RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows));
-        
_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), 
segment_num_rows);
-    }
 
+    auto segment_count = _rowset->num_segments();
     auto [seg_start, seg_end] = _segment_offsets;
+    // If seg_start == seg_end, it means that the segments of a rowset is not
+    // split scanned by multiple scanners, and the rowset reader is used to 
read the whole rowset.
     if (seg_start == seg_end) {
         seg_start = 0;
-        seg_end = segments.size();
+        seg_end = segment_count;
+    }
+    if (_read_context->record_rowids && _read_context->rowid_conversion) {
+        // init segment rowid map for rowid conversion
+        std::vector<uint32_t> segment_rows;
+        RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows));
+        
_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), 
segment_rows);
     }
 
-    const bool is_merge_iterator = _is_merge_iterator();
-    const bool use_lazy_init_iterators =
-            !is_merge_iterator && _read_context->reader_type == 
ReaderType::READER_QUERY;
-    for (int i = seg_start; i < seg_end; i++) {
+    for (int64_t i = seg_start; i < seg_end; i++) {
         SCOPED_RAW_TIMER(&_stats->rowset_reader_create_iterators_timer_ns);
-        auto& seg_ptr = segments[i];
         std::unique_ptr<RowwiseIterator> iter;
 
-        if (use_lazy_init_iterators) {
-            /// For non-merging iterators, we don't need to initialize them 
all at once when creating them.
-            /// Instead, we should initialize each iterator separately when 
really using them.
-            /// This optimization minimizes the lifecycle of resources like 
column readers
-            /// and prevents excessive memory consumption, especially for wide 
tables.
-            if (_segment_row_ranges.empty()) {
-                _read_options.row_ranges.clear();
-                iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, 
_input_schema,
-                                                                 
_read_options);
-            } else {
-                DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
-                auto local_options = _read_options;
-                local_options.row_ranges = _segment_row_ranges[i - seg_start];
-                iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, 
_input_schema,
-                                                                 
local_options);
-            }
+        /// For iterators, we don't need to initialize them all at once when 
creating them.
+        /// Instead, we should initialize each iterator separately when really 
using them.
+        /// This optimization minimizes the lifecycle of resources like column 
readers
+        /// and prevents excessive memory consumption, especially for wide 
tables.
+        if (_segment_row_ranges.empty()) {
+            _read_options.row_ranges.clear();
+            iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, 
should_use_cache,
+                                                             _input_schema, 
_read_options);
         } else {
-            Status status;
-            /// If `_segment_row_ranges` is empty, the segment is not split.
-            if (_segment_row_ranges.empty()) {
-                _read_options.row_ranges.clear();
-                status = seg_ptr->new_iterator(_input_schema, _read_options, 
&iter);
-            } else {
-                DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
-                auto local_options = _read_options;
-                local_options.row_ranges = _segment_row_ranges[i - seg_start];
-                status = seg_ptr->new_iterator(_input_schema, local_options, 
&iter);
-            }
-
-            if (!status.ok()) {
-                LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
-                             << "]: " << status.to_string();
-                return Status::Error<ROWSET_READER_INIT>(status.to_string());
-            }
+            DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
+            auto local_options = _read_options;
+            local_options.row_ranges = _segment_row_ranges[i - seg_start];
+            iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, 
should_use_cache,
+                                                             _input_schema, 
local_options);
         }
 
         if (iter->empty()) {
@@ -422,10 +386,4 @@ bool 
BetaRowsetReader::_should_push_down_value_predicates() const {
              _read_context->sequence_id_idx == -1) ||
             _read_context->enable_unique_key_merge_on_write);
 }
-
-Status BetaRowsetReader::get_segment_num_rows(std::vector<uint32_t>* 
segment_num_rows) {
-    segment_num_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
-    return Status::OK();
-}
-
 } // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_reader.h 
b/be/src/olap/rowset/beta_rowset_reader.h
index 33b2fb6a58c..b191480f7c7 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -80,8 +80,6 @@ public:
         return _iterator->current_block_row_locations(locations);
     }
 
-    Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) 
override;
-
     bool update_profile(RuntimeProfile* profile) override;
 
     RowsetReaderSharedPtr clone() override;
@@ -97,7 +95,7 @@ private:
                _rowset->rowset_meta()->is_segments_overlapping() && 
_get_segment_num() > 1;
     }
 
-    int32_t _get_segment_num() const {
+    int64_t _get_segment_num() const {
         auto [seg_start, seg_end] = _segment_offsets;
         if (seg_start == seg_end) {
             seg_start = 0;
@@ -108,7 +106,7 @@ private:
 
     DorisCallOnce<Status> _init_iter_once;
 
-    std::pair<int, int> _segment_offsets;
+    std::pair<int64_t, int64_t> _segment_offsets;
     std::vector<RowRanges> _segment_row_ranges;
 
     SchemaSPtr _input_schema;
@@ -120,8 +118,6 @@ private:
 
     std::unique_ptr<RowwiseIterator> _iterator;
 
-    std::vector<uint32_t> _segments_rows;
-
     StorageReadOptions _read_options;
 
     bool _empty = false;
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index ac3a2a7a1dc..0fd8e60f7ce 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -67,8 +67,6 @@ Status Rowset::load(bool use_cache) {
         std::lock_guard load_lock(_lock);
         // after lock, if rowset state is ROWSET_UNLOADING, it is ok to return
         if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
-            // first do load, then change the state
-            RETURN_IF_ERROR(do_load(use_cache));
             RETURN_IF_ERROR(_rowset_state_machine.on_load());
         }
     }
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index be21f29888e..01f321728f1 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -323,9 +323,6 @@ protected:
     // this is non-public because all clients should use RowsetFactory to 
obtain pointer to initialized Rowset
     virtual Status init() = 0;
 
-    // The actual implementation of load(). Guaranteed by to called exactly 
once.
-    virtual Status do_load(bool use_cache) = 0;
-
     // release resources in this api
     virtual void do_close() = 0;
 
diff --git a/be/src/olap/rowset/rowset_reader.h 
b/be/src/olap/rowset/rowset_reader.h
index 58c0f592b9c..6c637f47cc1 100644
--- a/be/src/olap/rowset/rowset_reader.h
+++ b/be/src/olap/rowset/rowset_reader.h
@@ -40,7 +40,7 @@ struct RowSetSplits {
     // if segment_offsets is not empty, means we only scan
     // [pair.first, pair.second) segment in rs_reader, only effective in dup 
key
     // and pipeline
-    std::pair<int, int> segment_offsets;
+    std::pair<int64_t, int64_t> segment_offsets;
 
     // RowRanges of each segment.
     std::vector<RowRanges> segment_row_ranges;
@@ -83,10 +83,6 @@ public:
         return Status::NotSupported("to be implemented");
     }
 
-    virtual Status get_segment_num_rows(std::vector<uint32_t>* 
segment_num_rows) {
-        return Status::NotSupported("to be implemented");
-    }
-
     virtual bool update_profile(RuntimeProfile* profile) = 0;
 
     virtual RowsetReaderSharedPtr clone() = 0;
diff --git a/be/src/olap/rowset/rowset_reader_context.h 
b/be/src/olap/rowset/rowset_reader_context.h
index fd3b4fed56f..2dd71328902 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -76,8 +76,7 @@ struct RowsetReaderContext {
     bool enable_unique_key_merge_on_write = false;
     const DeleteBitmap* delete_bitmap = nullptr;
     bool record_rowids = false;
-    RowIdConversion* rowid_conversion;
-    bool is_vertical_compaction = false;
+    RowIdConversion* rowid_conversion = nullptr;
     bool is_key_column_group = false;
     const std::set<int32_t>* output_columns = nullptr;
     RowsetId rowset_id;
diff --git a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
index d70df5a7bae..77e2310fc48 100644
--- a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
@@ -17,11 +17,18 @@
 
 #include "olap/rowset/segment_v2/lazy_init_segment_iterator.h"
 
+#include "olap/segment_loader.h"
+
 namespace doris::segment_v2 {
 
-LazyInitSegmentIterator::LazyInitSegmentIterator(std::shared_ptr<Segment> 
segment,
-                                                 SchemaSPtr schema, const 
StorageReadOptions& opts)
-        : _schema(std::move(schema)), _segment(std::move(segment)), 
_read_options(opts) {}
+LazyInitSegmentIterator::LazyInitSegmentIterator(BetaRowsetSharedPtr rowset, 
int64_t segment_id,
+                                                 bool should_use_cache, 
SchemaSPtr schema,
+                                                 const StorageReadOptions& 
opts)
+        : _rowset(std::move(rowset)),
+          _segment_id(segment_id),
+          _should_use_cache(should_use_cache),
+          _schema(std::move(schema)),
+          _read_options(opts) {}
 
 /// Here do not use the argument of `opts`,
 /// see where the iterator is created in 
`BetaRowsetReader::get_segment_iterators`
@@ -31,7 +38,15 @@ Status LazyInitSegmentIterator::init(const 
StorageReadOptions& /*opts*/) {
         return Status::OK();
     }
 
-    RETURN_IF_ERROR(_segment->new_iterator(_schema, _read_options, 
&_inner_iterator));
+    std::shared_ptr<Segment> segment;
+    {
+        SegmentCacheHandle segment_cache_handle;
+        RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
+                _rowset, _segment_id, &segment_cache_handle, 
_should_use_cache, false));
+        const auto& tmp_segments = segment_cache_handle.get_segments();
+        segment = tmp_segments[0];
+    }
+    RETURN_IF_ERROR(segment->new_iterator(_schema, _read_options, 
&_inner_iterator));
     return _inner_iterator->init(_read_options);
 }
 
diff --git a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h 
b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
index 923c540c456..c31918d092c 100644
--- a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
@@ -22,14 +22,18 @@
 #include "olap/rowset/segment_v2/segment_iterator.h"
 #include "vec/core/block.h"
 
+namespace doris {
+class BetaRowset;
+using BetaRowsetSharedPtr = std::shared_ptr<BetaRowset>;
+}; // namespace doris
 namespace doris::segment_v2 {
 
 using namespace vectorized;
 
 class LazyInitSegmentIterator : public RowwiseIterator {
 public:
-    LazyInitSegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr 
schema,
-                            const StorageReadOptions& opts);
+    LazyInitSegmentIterator(BetaRowsetSharedPtr rowset, int64_t segment_id, 
bool should_use_cache,
+                            SchemaSPtr schema, const StorageReadOptions& opts);
 
     ~LazyInitSegmentIterator() override = default;
 
@@ -59,8 +63,10 @@ public:
 
 private:
     bool _need_lazy_init {true};
+    BetaRowsetSharedPtr _rowset;
+    int64_t _segment_id {-1};
+    bool _should_use_cache {false};
     SchemaSPtr _schema = nullptr;
-    std::shared_ptr<Segment> _segment;
     StorageReadOptions _read_options;
     RowwiseIteratorUPtr _inner_iterator;
 };
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index 26ac54c699b..0501e604df2 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -52,6 +52,38 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) {
     LRUCachePolicy::erase(key.encode());
 }
 
+Status SegmentLoader::load_segment(const BetaRowsetSharedPtr& rowset, int64_t 
segment_id,
+                                   SegmentCacheHandle* cache_handle, bool 
use_cache,
+                                   bool need_load_pk_index_and_bf,
+                                   OlapReaderStatistics* index_load_stats) {
+    SegmentCache::CacheKey cache_key(rowset->rowset_id(), segment_id);
+    if (_segment_cache->lookup(cache_key, cache_handle)) {
+        // Has to check the segment status here, because the segment in cache 
may has something wrong during
+        // load index or create column reader.
+        // Not merge this if logic with previous to make the logic more clear.
+        if (cache_handle->pop_unhealthy_segment() == nullptr) {
+            return Status::OK();
+        }
+    }
+    // If the segment is not healthy, then will create a new segment and will 
replace the unhealthy one in SegmentCache.
+    segment_v2::SegmentSharedPtr segment;
+    RETURN_IF_ERROR(rowset->load_segment(segment_id, &segment));
+    if (need_load_pk_index_and_bf) {
+        RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats));
+    }
+    if (use_cache && !config::disable_segment_cache) {
+        // memory of SegmentCache::CacheValue will be handled by SegmentCache
+        auto* cache_value = new SegmentCache::CacheValue();
+        _cache_mem_usage += segment->meta_mem_usage();
+        cache_value->segment = std::move(segment);
+        _segment_cache->insert(cache_key, *cache_value, cache_handle);
+    } else {
+        cache_handle->push_segment(std::move(segment));
+    }
+
+    return Status::OK();
+}
+
 Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
                                     SegmentCacheHandle* cache_handle, bool 
use_cache,
                                     bool need_load_pk_index_and_bf,
@@ -60,30 +92,8 @@ Status SegmentLoader::load_segments(const 
BetaRowsetSharedPtr& rowset,
         return Status::OK();
     }
     for (int64_t i = 0; i < rowset->num_segments(); i++) {
-        SegmentCache::CacheKey cache_key(rowset->rowset_id(), i);
-        if (_segment_cache->lookup(cache_key, cache_handle)) {
-            // Has to check the segment status here, because the segment in 
cache may has something wrong during
-            // load index or create column reader.
-            // Not merge this if logic with previous to make the logic more 
clear.
-            if (cache_handle->pop_unhealthy_segment() == nullptr) {
-                continue;
-            }
-        }
-        // If the segment is not healthy, then will create a new segment and 
will replace the unhealthy one in SegmentCache.
-        segment_v2::SegmentSharedPtr segment;
-        RETURN_IF_ERROR(rowset->load_segment(i, &segment));
-        if (need_load_pk_index_and_bf) {
-            RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats));
-        }
-        if (use_cache && !config::disable_segment_cache) {
-            // memory of SegmentCache::CacheValue will be handled by 
SegmentCache
-            auto* cache_value = new SegmentCache::CacheValue();
-            _cache_mem_usage += segment->meta_mem_usage();
-            cache_value->segment = std::move(segment);
-            _segment_cache->insert(cache_key, *cache_value, cache_handle);
-        } else {
-            cache_handle->push_segment(std::move(segment));
-        }
+        RETURN_IF_ERROR(load_segment(rowset, i, cache_handle, use_cache, 
need_load_pk_index_and_bf,
+                                     index_load_stats));
     }
     cache_handle->set_inited();
     return Status::OK();
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index 834906da93b..57418ab7b78 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -120,6 +120,13 @@ public:
                          bool use_cache = false, bool 
need_load_pk_index_and_bf = false,
                          OlapReaderStatistics* index_load_stats = nullptr);
 
+    // Load one segment of "rowset", return the "cache_handle" which contains 
segments.
+    // If use_cache is true, it will be loaded from _cache.
+    Status load_segment(const BetaRowsetSharedPtr& rowset, int64_t segment_id,
+                        SegmentCacheHandle* cache_handle, bool use_cache = 
false,
+                        bool need_load_pk_index_and_bf = false,
+                        OlapReaderStatistics* index_load_stats = nullptr);
+
     void erase_segment(const SegmentCache::CacheKey& key);
 
     void erase_segments(const RowsetId& rowset_id, int64_t num_segments);
diff --git a/be/test/olap/ordered_data_compaction_test.cpp 
b/be/test/olap/ordered_data_compaction_test.cpp
index 934dfbef3ea..dbbe19fc475 100644
--- a/be/test/olap/ordered_data_compaction_test.cpp
+++ b/be/test/olap/ordered_data_compaction_test.cpp
@@ -530,8 +530,6 @@ TEST_F(OrderedDataCompactionTest, test_01) {
     EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
     EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
     EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * 
rows_per_segment);
-    std::vector<uint32_t> segment_num_rows;
-    
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
     // check vertical compaction result
     for (auto id = 0; id < output_data.size(); id++) {
         LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
diff --git a/be/test/olap/rowid_conversion_test.cpp 
b/be/test/olap/rowid_conversion_test.cpp
index d48d4150ad3..9221c93479b 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -382,8 +382,9 @@ protected:
         } while (s.ok());
         EXPECT_TRUE(s.is<END_OF_FILE>()) << s;
         EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
+        auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(out_rowset);
         std::vector<uint32_t> segment_num_rows;
-        
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
+        EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok());
         if (has_delete_handler) {
             // All keys less than 1000 are deleted by delete handler
             for (auto& item : output_data) {
diff --git a/be/test/olap/segcompaction_mow_test.cpp 
b/be/test/olap/segcompaction_mow_test.cpp
index 62a3232889d..5463de03f2b 100644
--- a/be/test/olap/segcompaction_mow_test.cpp
+++ b/be/test/olap/segcompaction_mow_test.cpp
@@ -239,7 +239,6 @@ protected:
         reader_context.stats = &_stats;
         reader_context.delete_bitmap = delete_bitmap.get();
 
-        std::vector<uint32_t> segment_num_rows;
         Status s;
 
         // without predicates
@@ -280,7 +279,9 @@ protected:
             EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
             EXPECT_EQ(rowset->rowset_meta()->num_rows(), expect_total_rows);
             EXPECT_EQ(num_rows_read, expect_total_rows - rows_mark_deleted);
-            
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
+            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+            std::vector<uint32_t> segment_num_rows;
+            
EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok());
             size_t total_num_rows = 0;
             for (const auto& i : segment_num_rows) {
                 total_num_rows += i;
@@ -307,7 +308,6 @@ TEST_P(SegCompactionMoWTest, SegCompactionThenRead) {
     config::segcompaction_candidate_max_rows = 6000; // set threshold above
                                                      // rows_per_segment
     config::segcompaction_batch_size = 10;
-    std::vector<uint32_t> segment_num_rows;
     DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
     uint32_t rows_mark_deleted = 0;
     { // write `num_segments * rows_per_segment` rows to rowset
@@ -413,7 +413,6 @@ TEST_F(SegCompactionMoWTest, 
SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
     DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
     uint32_t rows_mark_deleted = 0;
     uint32_t total_written_rows = 0;
-    std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
         create_rowset_writer_context(20048, tablet_schema, &writer_context);
@@ -641,7 +640,6 @@ TEST_F(SegCompactionMoWTest, 
SegCompactionInterleaveWithBig_OoOoO) {
     RowsetSharedPtr rowset;
     config::segcompaction_candidate_max_rows = 6000; // set threshold above
     config::segcompaction_batch_size = 5;
-    std::vector<uint32_t> segment_num_rows;
     DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
     uint32_t rows_mark_deleted = 0;
     uint32_t total_written_rows = 0;
@@ -832,7 +830,6 @@ TEST_F(SegCompactionMoWTest, SegCompactionNotTrigger) {
     config::segcompaction_candidate_max_rows = 6000; // set threshold above
                                                      // rows_per_segment
     config::segcompaction_batch_size = 10;
-    std::vector<uint32_t> segment_num_rows;
     DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
     uint32_t rows_mark_deleted = 0;
     { // write `num_segments * rows_per_segment` rows to rowset
diff --git a/be/test/olap/segcompaction_test.cpp 
b/be/test/olap/segcompaction_test.cpp
index 2457ff11b83..f33cc4e944b 100644
--- a/be/test/olap/segcompaction_test.cpp
+++ b/be/test/olap/segcompaction_test.cpp
@@ -291,7 +291,6 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
     config::segcompaction_candidate_max_rows = 6000; // set threshold above
                                                      // rows_per_segment
     config::segcompaction_batch_size = 10;
-    std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
         create_rowset_writer_context(10047, tablet_schema, &writer_context);
@@ -387,7 +386,9 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
             EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
             EXPECT_EQ(rowset->rowset_meta()->num_rows(), num_rows_read);
             EXPECT_EQ(num_rows_read, num_segments * rows_per_segment);
-            
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
+            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+            std::vector<uint32_t> segment_num_rows;
+            
EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok());
             size_t total_num_rows = 0;
             for (const auto& i : segment_num_rows) {
                 total_num_rows += i;
@@ -406,7 +407,6 @@ TEST_F(SegCompactionTest, 
SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
     RowsetSharedPtr rowset;
     config::segcompaction_candidate_max_rows = 6000; // set threshold above
                                                      // rows_per_segment
-    std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
         create_rowset_writer_context(10048, tablet_schema, &writer_context);
@@ -561,7 +561,6 @@ TEST_F(SegCompactionTest, 
SegCompactionInterleaveWithBig_OoOoO) {
     RowsetSharedPtr rowset;
     config::segcompaction_candidate_max_rows = 6000; // set threshold above
     config::segcompaction_batch_size = 5;
-    std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
         create_rowset_writer_context(10049, tablet_schema, &writer_context);
@@ -693,7 +692,6 @@ TEST_F(SegCompactionTest, 
SegCompactionThenReadUniqueTableSmall) {
     config::segcompaction_candidate_max_rows = 6000; // set threshold above
                                                      // rows_per_segment
     config::segcompaction_batch_size = 3;
-    std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
         create_rowset_writer_context(10051, tablet_schema, &writer_context);
@@ -894,7 +892,9 @@ TEST_F(SegCompactionTest, 
SegCompactionThenReadUniqueTableSmall) {
             // duplicated keys between segments are counted duplicately
             // so actual read by rowset reader is less or equal to it
             EXPECT_GE(rowset->rowset_meta()->num_rows(), num_rows_read);
-            
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
+            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+            std::vector<uint32_t> segment_num_rows;
+            
EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok());
             size_t total_num_rows = 0;
             for (const auto& i : segment_num_rows) {
                 total_num_rows += i;
@@ -927,7 +927,6 @@ TEST_F(SegCompactionTest, CreateSegCompactionWriter) {
     config::segcompaction_candidate_max_rows = 6000; // set threshold above
     // rows_per_segment
     config::segcompaction_batch_size = 3;
-    std::vector<uint32_t> segment_num_rows;
     {
         RowsetWriterContext writer_context;
         create_rowset_writer_context(10052, tablet_schema, &writer_context);
@@ -959,7 +958,6 @@ TEST_F(SegCompactionTest, 
SegCompactionThenReadAggTableSmall) {
     config::segcompaction_candidate_max_rows = 6000; // set threshold above
                                                      // rows_per_segment
     config::segcompaction_batch_size = 3;
-    std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
         create_rowset_writer_context(10052, tablet_schema, &writer_context);
@@ -1162,7 +1160,9 @@ TEST_F(SegCompactionTest, 
SegCompactionThenReadAggTableSmall) {
             // duplicated keys between segments are counted duplicately
             // so actual read by rowset reader is less or equal to it
             EXPECT_GE(rowset->rowset_meta()->num_rows(), num_rows_read);
-            
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
+            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+            std::vector<uint32_t> segment_num_rows;
+            
EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok());
             size_t total_num_rows = 0;
             for (const auto& i : segment_num_rows) {
                 total_num_rows += i;
diff --git a/be/test/testutil/mock_rowset.h b/be/test/testutil/mock_rowset.h
index 1d6638863df..78f8f71bee8 100644
--- a/be/test/testutil/mock_rowset.h
+++ b/be/test/testutil/mock_rowset.h
@@ -77,10 +77,6 @@ protected:
 
     Status init() override { return Status::NotSupported("MockRowset not 
support this method."); }
 
-    Status do_load(bool use_cache) override {
-        return Status::NotSupported("MockRowset not support this method.");
-    }
-
     void do_close() override {
         // Do nothing.
     }
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp 
b/be/test/vec/olap/vertical_compaction_test.cpp
index 4c4409a7506..dd6f6932efc 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -521,8 +521,6 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
     EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
     EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
     EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * 
rows_per_segment);
-    std::vector<uint32_t> segment_num_rows;
-    
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
     // check vertical compaction result
     for (auto id = 0; id < output_data.size(); id++) {
         LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
@@ -628,8 +626,6 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMerge) {
     EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
     EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
     EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * 
rows_per_segment);
-    std::vector<uint32_t> segment_num_rows;
-    
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
     // check vertical compaction result
     for (auto id = 0; id < output_data.size(); id++) {
         LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
@@ -736,8 +732,6 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
     EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
     EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
     EXPECT_EQ(output_data.size(), num_segments * rows_per_segment);
-    std::vector<uint32_t> segment_num_rows;
-    
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
     // check vertical compaction result
     for (auto id = 0; id < output_data.size(); id++) {
         LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
@@ -848,8 +842,6 @@ TEST_F(VerticalCompactionTest, 
TestDupKeyVerticalMergeWithDelete) {
     EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
     EXPECT_EQ(output_data.size(),
               num_input_rowset * num_segments * rows_per_segment - 
num_input_rowset * 100);
-    std::vector<uint32_t> segment_num_rows;
-    
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
     // All keys less than 1000 are deleted by delete handler
     for (auto& item : output_data) {
         ASSERT_GE(std::get<0>(item), 100);
@@ -951,8 +943,6 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMergeWithDelete) {
     EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
     EXPECT_EQ(output_data.size(),
               num_input_rowset * num_segments * rows_per_segment - 
num_input_rowset * 100);
-    std::vector<uint32_t> segment_num_rows;
-    
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
     // All keys less than 1000 are deleted by delete handler
     for (auto& item : output_data) {
         ASSERT_GE(std::get<0>(item), 100);
@@ -1042,8 +1032,6 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
     EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
     EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
     EXPECT_EQ(output_data.size(), num_segments * rows_per_segment);
-    std::vector<uint32_t> segment_num_rows;
-    
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
     // check vertical compaction result
     for (auto id = 0; id < output_data.size(); id++) {
         LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to