This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 06cbb96e79d [feature](merge-cloud) cloud meta mgr impl 
prepare/commit/sync rowset (#30128)
06cbb96e79d is described below

commit 06cbb96e79d2c782b3533f0f236da7db9ab3455f
Author: walter <w41te...@gmail.com>
AuthorDate: Tue Jan 23 18:56:54 2024 +0800

    [feature](merge-cloud) cloud meta mgr impl prepare/commit/sync rowset 
(#30128)
---
 be/src/cloud/cloud_meta_mgr.cpp                    | 345 ++++++++++++++++++++-
 be/src/cloud/cloud_meta_mgr.h                      |   2 +-
 be/src/cloud/cloud_tablet.cpp                      |  54 +---
 be/src/cloud/cloud_tablet.h                        |  22 +-
 be/src/common/status.h                             |   3 +-
 be/src/olap/base_tablet.cpp                        | 139 ++++++++-
 be/src/olap/base_tablet.h                          |  26 +-
 be/src/olap/compaction.cpp                         |  12 +-
 be/src/olap/rowset/rowset_meta.cpp                 |  18 +-
 be/src/olap/rowset/rowset_meta.h                   |   8 +-
 be/src/olap/rowset_builder.cpp                     |   2 +-
 be/src/olap/schema_change.cpp                      |   2 +-
 be/src/olap/single_replica_compaction.cpp          |   9 +-
 be/src/olap/snapshot_manager.cpp                   |   2 +-
 be/src/olap/tablet.cpp                             | 138 +--------
 be/src/olap/tablet.h                               |  23 +-
 be/src/olap/tablet_manager.cpp                     |   4 +-
 be/src/olap/tablet_meta.cpp                        |   4 +
 be/src/olap/tablet_meta.h                          |   6 +
 be/src/olap/task/engine_clone_task.cpp             |   7 +-
 be/src/olap/task/engine_storage_migration_task.cpp |   2 +-
 be/test/olap/delta_writer_test.cpp                 |  18 +-
 .../olap/engine_storage_migration_task_test.cpp    |   4 +-
 be/test/olap/remote_rowset_gc_test.cpp             |   4 +-
 be/test/olap/tablet_cooldown_test.cpp              |   4 +-
 25 files changed, 588 insertions(+), 270 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index d6eb54e5c41..eb2802b99b0 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -38,6 +38,7 @@
 #include "gen_cpp/cloud.pb.h"
 #include "gen_cpp/olap_file.pb.h"
 #include "olap/olap_common.h"
+#include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_factory.h"
 #include "olap/tablet_meta.h"
 #include "runtime/stream_load/stream_load_context.h"
@@ -137,7 +138,7 @@ private:
 
         auto channel = std::make_unique<brpc::Channel>();
         Status s = init_channel(channel.get());
-        if (UNLIKELY(!s.ok())) {
+        if (!s.ok()) [[unlikely]] {
             return s;
         }
 
@@ -189,6 +190,8 @@ static std::string debug_info(const Request& req) {
         return fmt::format(" tablet_id={}", req.tablet_id());
     } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest>) {
         return "";
+    } else if constexpr (is_any_v<Request, CreateRowsetRequest>) {
+        return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id());
     } else {
         static_assert(!sizeof(Request));
     }
@@ -224,7 +227,7 @@ static Status retry_rpc(std::string_view op_name, const 
Request& req, Response*
         cntl.set_max_retry(BRPC_RETRY_TIMES);
         res->Clear();
         (stub.get()->*method)(&cntl, &req, res, nullptr);
-        if (UNLIKELY(cntl.Failed())) {
+        if (cntl.Failed()) [[unlikely]] {
             error_msg = cntl.ErrorText();
         } else if (res->status().code() == MetaServiceCode::OK) {
             return Status::OK();
@@ -271,28 +274,339 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, 
TabletMetaSharedPtr* tab
 }
 
 Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool 
warmup_delta_data) {
-    return Status::NotSupported("CloudMetaMgr::sync_tablet_rowsets is not 
implemented");
+    using namespace std::chrono;
+
+    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", 
Status::OK(), tablet);
+
+    std::shared_ptr<MetaService_Stub> stub;
+    RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
+
+    int tried = 0;
+    while (true) {
+        brpc::Controller cntl;
+        cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
+        GetRowsetRequest req;
+        GetRowsetResponse resp;
+
+        int64_t tablet_id = tablet->tablet_id();
+        int64_t table_id = tablet->table_id();
+        int64_t index_id = tablet->index_id();
+        req.set_cloud_unique_id(config::cloud_unique_id);
+        auto* idx = req.mutable_idx();
+        idx->set_tablet_id(tablet_id);
+        idx->set_table_id(table_id);
+        idx->set_index_id(index_id);
+        idx->set_partition_id(tablet->partition_id());
+        {
+            std::shared_lock rlock(tablet->get_header_lock());
+            req.set_start_version(tablet->local_max_version() + 1);
+            req.set_base_compaction_cnt(tablet->base_compaction_cnt());
+            
req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt());
+            req.set_cumulative_point(tablet->cumulative_layer_point());
+        }
+        req.set_end_version(-1);
+        VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString();
+
+        stub->get_rowset(&cntl, &req, &resp, nullptr);
+        int64_t latency = cntl.latency_us();
+        g_get_rowset_latency << latency;
+        int retry_times = config::meta_service_rpc_retry_times;
+        if (cntl.Failed()) {
+            if (tried++ < retry_times) {
+                auto rng = make_random_engine();
+                std::uniform_int_distribution<uint32_t> u(20, 200);
+                std::uniform_int_distribution<uint32_t> u1(500, 1000);
+                uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng);
+                std::this_thread::sleep_for(milliseconds(duration_ms));
+                LOG_INFO("failed to get rowset meta")
+                        .tag("reason", cntl.ErrorText())
+                        .tag("tablet_id", tablet_id)
+                        .tag("table_id", table_id)
+                        .tag("index_id", index_id)
+                        .tag("partition_id", tablet->partition_id())
+                        .tag("tried", tried)
+                        .tag("sleep", duration_ms);
+                continue;
+            }
+            return Status::RpcError("failed to get rowset meta: {}", 
cntl.ErrorText());
+        }
+        if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
+            return Status::NotFound("failed to get rowset meta: {}", 
resp.status().msg());
+        }
+        if (resp.status().code() != MetaServiceCode::OK) {
+            return Status::InternalError("failed to get rowset meta: {}", 
resp.status().msg());
+        }
+        if (latency > 100 * 1000) { // 100ms
+            LOG(INFO) << "finish get_rowset rpc. rowset_meta.size()=" << 
resp.rowset_meta().size()
+                      << ", latency=" << latency << "us";
+        } else {
+            LOG_EVERY_N(INFO, 100)
+                    << "finish get_rowset rpc. rowset_meta.size()=" << 
resp.rowset_meta().size()
+                    << ", latency=" << latency << "us";
+        }
+
+        int64_t now = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+        tablet->last_sync_time_s = now;
+
+        if (tablet->enable_unique_key_merge_on_write()) {
+            DeleteBitmap delete_bitmap(tablet_id);
+            int64_t old_max_version = req.start_version() - 1;
+            auto st = sync_tablet_delete_bitmap(tablet, old_max_version, 
resp.rowset_meta(),
+                                                resp.stats(), req.idx(), 
&delete_bitmap);
+            if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) {
+                LOG_WARNING("rowset meta is expired, need to retry")
+                        .tag("tablet", tablet->tablet_id())
+                        .tag("tried", tried)
+                        .error(st);
+                continue;
+            }
+            if (!st.ok()) {
+                LOG_WARNING("failed to get delete bimtap")
+                        .tag("tablet", tablet->tablet_id())
+                        .error(st);
+                return st;
+            }
+            tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap);
+        }
+        {
+            const auto& stats = resp.stats();
+            std::unique_lock wlock(tablet->get_header_lock());
+
+            // ATTN: we are facing following data race
+            //
+            // 
resp_base_compaction_cnt=0|base_compaction_cnt=0|resp_cumulative_compaction_cnt=0|cumulative_compaction_cnt=1|resp_max_version=11|max_version=8
+            //
+            //   BE-compaction-thread                 meta-service             
                        BE-query-thread
+            //            |                                |                   
                             |
+            //    local   |    commit cumu-compaction      |                   
                             |
+            //   cc_cnt=0 |  --------------------------->  |     sync rowset 
(long rpc, local cc_cnt=0 )    |   local
+            //            |                                |  
<-----------------------------------------    |  cc_cnt=0
+            //            |                                |  -.               
                             |
+            //    local   |       done cc_cnt=1            |    \              
                             |
+            //   cc_cnt=1 |  <---------------------------  |     \             
                             |
+            //            |                                |      \  returned 
with resp cc_cnt=0 (snapshot) |
+            //            |                                |       
'------------------------------------>   |   local
+            //            |                                |                   
                             |  cc_cnt=1
+            //            |                                |                   
                             |
+            //            |                                |                   
                             |  CHECK FAIL
+            //            |                                |                   
                             |  need retry
+            // To get rid of just retry syncing tablet
+            if (stats.base_compaction_cnt() < tablet->base_compaction_cnt() ||
+                stats.cumulative_compaction_cnt() < 
tablet->cumulative_compaction_cnt())
+                    [[unlikely]] {
+                // stale request, ignore
+                LOG_WARNING("stale get rowset meta request")
+                        .tag("resp_base_compaction_cnt", 
stats.base_compaction_cnt())
+                        .tag("base_compaction_cnt", 
tablet->base_compaction_cnt())
+                        .tag("resp_cumulative_compaction_cnt", 
stats.cumulative_compaction_cnt())
+                        .tag("cumulative_compaction_cnt", 
tablet->cumulative_compaction_cnt())
+                        .tag("tried", tried);
+                if (tried++ < 10) continue;
+                return Status::OK();
+            }
+            std::vector<RowsetSharedPtr> rowsets;
+            rowsets.reserve(resp.rowset_meta().size());
+            for (const auto& cloud_rs_meta_pb : resp.rowset_meta()) {
+                VLOG_DEBUG << "get rowset meta, tablet_id=" << 
cloud_rs_meta_pb.tablet_id()
+                           << ", version=[" << 
cloud_rs_meta_pb.start_version() << '-'
+                           << cloud_rs_meta_pb.end_version() << ']';
+                auto existed_rowset = tablet->get_rowset_by_version(
+                        {cloud_rs_meta_pb.start_version(), 
cloud_rs_meta_pb.end_version()});
+                if (existed_rowset &&
+                    existed_rowset->rowset_id().to_string() == 
cloud_rs_meta_pb.rowset_id_v2()) {
+                    continue; // Same rowset, skip it
+                }
+                RowsetMetaPB meta_pb = 
cloud_rowset_meta_to_doris(cloud_rs_meta_pb);
+                auto rs_meta = std::make_shared<RowsetMeta>();
+                rs_meta->init_from_pb(meta_pb);
+                RowsetSharedPtr rowset;
+                // schema is nullptr implies using RowsetMeta.tablet_schema
+                Status s = RowsetFactory::create_rowset(nullptr, 
tablet->tablet_path(), rs_meta,
+                                                        &rowset);
+                if (!s.ok()) {
+                    LOG_WARNING("create rowset").tag("status", s);
+                    return s;
+                }
+                rowsets.push_back(std::move(rowset));
+            }
+            if (!rowsets.empty()) {
+                // `rowsets.empty()` could happen after doing EMPTY_CUMULATIVE 
compaction. e.g.:
+                //   BE has [0-1][2-11][12-12], [12-12] is delete predicate, 
cp is 2;
+                //   after doing EMPTY_CUMULATIVE compaction, MS cp is 13, 
get_rowset will return [2-11][12-12].
+                bool version_overlap =
+                        tablet->local_max_version() >= 
rowsets.front()->start_version();
+                tablet->add_rowsets(std::move(rowsets), version_overlap, 
wlock, warmup_delta_data);
+            }
+            tablet->last_base_compaction_success_time_ms = 
stats.last_base_compaction_time_ms();
+            tablet->last_cumu_compaction_success_time_ms = 
stats.last_cumu_compaction_time_ms();
+            tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
+            
tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
+            tablet->set_cumulative_layer_point(stats.cumulative_point());
+            tablet->reset_approximate_stats(stats.num_rowsets(), 
stats.num_segments(),
+                                            stats.num_rows(), 
stats.data_size());
+        }
+        return Status::OK();
+    }
 }
 
 Status CloudMetaMgr::sync_tablet_delete_bitmap(
         CloudTablet* tablet, int64_t old_max_version,
-        const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas,
+        const google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>& rs_metas,
         const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap* 
delete_bitmap) {
-    return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is 
not implemented");
+    if (rs_metas.empty()) {
+        return Status::OK();
+    }
+
+    std::shared_ptr<MetaService_Stub> stub;
+    RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
+
+    int64_t new_max_version = std::max(old_max_version, 
rs_metas.rbegin()->end_version());
+    brpc::Controller cntl;
+    // When there are many delete bitmaps that need to be synchronized, it
+    // may take a longer time, especially when loading the tablet for the
+    // first time, so set a relatively long timeout time.
+    cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
+    GetDeleteBitmapRequest req;
+    GetDeleteBitmapResponse res;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    req.set_tablet_id(tablet->tablet_id());
+    req.set_base_compaction_cnt(stats.base_compaction_cnt());
+    req.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
+    req.set_cumulative_point(stats.cumulative_point());
+    *(req.mutable_idx()) = idx;
+    // New rowset sync all versions of delete bitmap
+    for (const auto& rs_meta : rs_metas) {
+        req.add_rowset_ids(rs_meta.rowset_id_v2());
+        req.add_begin_versions(0);
+        req.add_end_versions(new_max_version);
+    }
+
+    // old rowset sync incremental versions of delete bitmap
+    if (old_max_version > 0 && old_max_version < new_max_version) {
+        RowsetIdUnorderedSet all_rs_ids;
+        RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
+        for (const auto& rs_id : all_rs_ids) {
+            req.add_rowset_ids(rs_id.to_string());
+            req.add_begin_versions(old_max_version + 1);
+            req.add_end_versions(new_max_version);
+        }
+    }
+
+    VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();
+    stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
+    if (cntl.Failed()) {
+        return Status::RpcError("failed to get delete bitmap: {}", 
cntl.ErrorText());
+    }
+    if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
+        return Status::NotFound("failed to get delete bitmap: {}", 
res.status().msg());
+    }
+    // The delete bitmap of stale rowsets will be removed when commit 
compaction job,
+    // then delete bitmap of stale rowsets cannot be obtained. But the rowsets 
obtained
+    // by sync_tablet_rowsets may include these stale rowsets. When this case 
happend, the
+    // error code of ROWSETS_EXPIRED will be returned, we need to retry sync 
rowsets again.
+    //
+    // Be query thread             meta-service          Be compaction thread
+    //      |                            |                         |
+    //      |        get rowset          |                         |
+    //      |--------------------------->|                         |
+    //      |    return get rowset       |                         |
+    //      |<---------------------------|                         |
+    //      |                            |        commit job       |
+    //      |                            |<------------------------|
+    //      |                            |    return commit job    |
+    //      |                            |------------------------>|
+    //      |      get delete bitmap     |                         |
+    //      |--------------------------->|                         |
+    //      |  return get delete bitmap  |                         |
+    //      |<---------------------------|                         |
+    //      |                            |                         |
+    if (res.status().code() == MetaServiceCode::ROWSETS_EXPIRED) {
+        return Status::Error<ErrorCode::ROWSETS_EXPIRED, false>("failed to get 
delete bitmap: {}",
+                                                                
res.status().msg());
+    }
+    if (res.status().code() != MetaServiceCode::OK) {
+        return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to get 
delete bitmap: {}",
+                                                               
res.status().msg());
+    }
+    const auto& rowset_ids = res.rowset_ids();
+    const auto& segment_ids = res.segment_ids();
+    const auto& vers = res.versions();
+    const auto& delete_bitmaps = res.segment_delete_bitmaps();
+    for (size_t i = 0; i < rowset_ids.size(); i++) {
+        RowsetId rst_id;
+        rst_id.init(rowset_ids[i]);
+        delete_bitmap->merge({rst_id, segment_ids[i], vers[i]},
+                             roaring::Roaring::read(delete_bitmaps[i].data()));
+    }
+    return Status::OK();
 }
 
 Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp,
                                     RowsetMetaSharedPtr* existed_rs_meta) {
-    return Status::NotSupported("CloudMetaMgr::prepare_rowset is not 
implemented");
+    VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id()
+               << ", rowset_id: " << rs_meta.rowset_id() << ", is_tmp: " << 
is_tmp;
+
+    CreateRowsetRequest req;
+    CreateRowsetResponse resp;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    req.set_temporary(is_tmp);
+
+    RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true);
+    rs_meta.to_rowset_pb(&doris_rs_meta, true);
+    doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), 
std::move(doris_rs_meta));
+
+    Status st = retry_rpc("prepare rowset", req, &resp, 
&MetaService_Stub::prepare_rowset);
+    if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
+        if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
+            RowsetMetaPB doris_rs_meta =
+                    
cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta()));
+            *existed_rs_meta = std::make_shared<RowsetMeta>();
+            (*existed_rs_meta)->init_from_pb(doris_rs_meta);
+        }
+        return Status::AlreadyExist("failed to prepare rowset: {}", 
resp.status().msg());
+    }
+    return st;
 }
 
 Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, bool is_tmp,
                                    RowsetMetaSharedPtr* existed_rs_meta) {
-    return Status::NotSupported("CloudMetaMgr::commit_rowset is not 
implemented");
+    VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
+               << ", rowset_id: " << rs_meta.rowset_id() << ", is_tmp: " << 
is_tmp;
+    CreateRowsetRequest req;
+    CreateRowsetResponse resp;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    req.set_temporary(is_tmp);
+
+    RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb();
+    doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), 
std::move(rs_meta_pb));
+    Status st = retry_rpc("commit rowset", req, &resp, 
&MetaService_Stub::commit_rowset);
+    if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
+        if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
+            RowsetMetaPB doris_rs_meta =
+                    
cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta()));
+            *existed_rs_meta = std::make_shared<RowsetMeta>();
+            (*existed_rs_meta)->init_from_pb(doris_rs_meta);
+        }
+        return Status::AlreadyExist("failed to commit rowset: {}", 
resp.status().msg());
+    }
+    return st;
 }
 
 Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {
-    return Status::NotSupported("CloudMetaMgr::update_tmp_rowset is not 
implemented");
+    VLOG_DEBUG << "update committed rowset, tablet_id: " << rs_meta.tablet_id()
+               << ", rowset_id: " << rs_meta.rowset_id();
+    CreateRowsetRequest req;
+    CreateRowsetResponse resp;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+
+    RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(true);
+    doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), 
std::move(rs_meta_pb));
+    Status st =
+            retry_rpc("update committed rowset", req, &resp, 
&MetaService_Stub::update_tmp_rowset);
+    if (!st.ok() && resp.status().code() == 
MetaServiceCode::ROWSET_META_NOT_FOUND) {
+        return Status::InternalError("failed to update committed rowset: {}", 
resp.status().msg());
+    }
+    return st;
 }
 
 Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
@@ -436,15 +750,14 @@ Status CloudMetaMgr::update_delete_bitmap(const 
CloudTablet& tablet, int64_t loc
     req.set_tablet_id(tablet.tablet_id());
     req.set_lock_id(lock_id);
     req.set_initiator(initiator);
-    for (auto iter = delete_bitmap->delete_bitmap.begin();
-         iter != delete_bitmap->delete_bitmap.end(); ++iter) {
-        req.add_rowset_ids(std::get<0>(iter->first).to_string());
-        req.add_segment_ids(std::get<1>(iter->first));
-        req.add_versions(std::get<2>(iter->first));
+    for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
+        req.add_rowset_ids(std::get<0>(key).to_string());
+        req.add_segment_ids(std::get<1>(key));
+        req.add_versions(std::get<2>(key));
         // To save space, convert array and bitmap containers to run containers
-        iter->second.runOptimize();
-        std::string bitmap_data(iter->second.getSizeInBytes(), '\0');
-        iter->second.write(bitmap_data.data());
+        bitmap.runOptimize();
+        std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
+        bitmap.write(bitmap_data.data());
         *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
     }
     auto st = retry_rpc("update delete bitmap", req, &res, 
&MetaService_Stub::update_delete_bitmap);
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index af5b048b2f0..59f11c17152 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -88,7 +88,7 @@ public:
 private:
     Status sync_tablet_delete_bitmap(
             CloudTablet* tablet, int64_t old_max_version,
-            const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas,
+            const google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>& 
rs_metas,
             const TabletStatsPB& stas, const TabletIndexPB& idx, DeleteBitmap* 
delete_bitmap);
 };
 
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 6beb1e45d94..c83731fe83f 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -53,7 +53,7 @@ Status CloudTablet::capture_rs_readers(const Version& 
spec_version,
     if (!st.ok()) {
         rlock.unlock(); // avoid logging in lock range
         // Check no missed versions or req version is merged
-        auto missed_versions = calc_missed_versions(spec_version.second);
+        auto missed_versions = get_missed_versions(spec_version.second);
         if (missed_versions.empty()) {
             st.set_code(VERSION_ALREADY_MERGED); // Reset error code
         }
@@ -67,50 +67,6 @@ Status CloudTablet::capture_rs_readers(const Version& 
spec_version,
     return capture_rs_readers_unlocked(version_path, rs_splits);
 }
 
-// for example:
-//     [0-4][5-5][8-8][9-9][13-13]
-// if spec_version = 12, it will return [6-7],[10-12]
-Versions CloudTablet::calc_missed_versions(int64_t spec_version) {
-    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
-
-    Versions missed_versions;
-    Versions existing_versions;
-    {
-        std::shared_lock rdlock(_meta_lock);
-        for (const auto& rs : _tablet_meta->all_rs_metas()) {
-            existing_versions.emplace_back(rs->version());
-        }
-    }
-
-    // sort the existing versions in ascending order
-    std::sort(existing_versions.begin(), existing_versions.end(),
-              [](const Version& a, const Version& b) {
-                  // simple because 2 versions are certainly not overlapping
-                  return a.first < b.first;
-              });
-
-    auto min_version = existing_versions.front().first;
-    if (min_version > 0) {
-        missed_versions.emplace_back(0, std::min(spec_version, min_version - 
1));
-    }
-    for (auto it = existing_versions.begin(); it != existing_versions.end() - 
1; ++it) {
-        auto prev_v = it->second;
-        if (prev_v >= spec_version) {
-            return missed_versions;
-        }
-        auto next_v = (it + 1)->first;
-        if (next_v > prev_v + 1) {
-            // there is a hole between versions
-            missed_versions.emplace_back(prev_v + 1, std::min(spec_version, 
next_v - 1));
-        }
-    }
-    auto max_version = existing_versions.back().second;
-    if (max_version < spec_version) {
-        missed_versions.emplace_back(max_version + 1, spec_version);
-    }
-    return missed_versions;
-}
-
 Status CloudTablet::sync_meta() {
     // TODO(lightman): FileCache
     return Status::NotSupported("CloudTablet::sync_meta is not implemented");
@@ -451,4 +407,12 @@ void CloudTablet::get_compaction_status(std::string* 
json_result) {
     *json_result = std::string(strbuf.GetString());
 }
 
+inline void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
+    // cumulative point should only be reset to -1, or be increased
+    CHECK(new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= 
_cumulative_point)
+            << "Unexpected cumulative point: " << new_point
+            << ", origin: " << _cumulative_point.load();
+    _cumulative_point = new_point;
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index bf8db3c9451..bfd22d2f54c 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -20,7 +20,6 @@
 #include <atomic>
 
 #include "olap/base_tablet.h"
-#include "olap/version_graph.h"
 
 namespace doris {
 
@@ -64,7 +63,7 @@ public:
     // If tablet state is not `TABLET_RUNNING`, sync tablet meta and all 
visible rowsets.
     // If `query_version` > 0 and local max_version of the tablet >= 
`query_version`, do nothing.
     // If 'need_download_data_async' is true, it means that we need to 
download the new version
-    // rowsets datas async.
+    // rowsets datum async.
     Status sync_rowsets(int64_t query_version = -1, bool warmup_delta_data = 
false);
 
     // Synchronize the tablet meta from meta service.
@@ -74,7 +73,7 @@ public:
     // If 'warmup_delta_data' is true, download the new version rowset data in 
background.
     // MUST hold EXCLUSIVE `_meta_lock`.
     // If 'need_download_data_async' is true, it means that we need to 
download the new version
-    // rowsets datas async.
+    // rowsets datum async.
     void add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
                      std::unique_lock<std::shared_mutex>& meta_lock,
                      bool warmup_delta_data = false);
@@ -98,6 +97,17 @@ public:
     int64_t get_cloud_base_compaction_score() const;
     int64_t get_cloud_cumu_compaction_score() const;
 
+    int64_t local_max_version() const { return _max_version; }
+    int64_t base_compaction_cnt() const { return _base_compaction_cnt; }
+    int64_t cumulative_compaction_cnt() const { return 
_cumulative_compaction_cnt; }
+    int64_t cumulative_layer_point() const {
+        return _cumulative_point.load(std::memory_order_relaxed);
+    }
+
+    void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
+    void set_cumulative_compaction_cnt(int64_t cnt) { 
_cumulative_compaction_cnt = cnt; }
+    void set_cumulative_layer_point(int64_t new_point);
+
     int64_t last_sync_time_s = 0;
     int64_t last_load_time_ms = 0;
     int64_t last_base_compaction_success_time_ms = 0;
@@ -105,8 +115,6 @@ public:
     int64_t last_cumu_no_suitable_version_ms = 0;
 
 private:
-    Versions calc_missed_versions(int64_t spec_version);
-
     // FIXME(plat1ko): No need to record base size if rowsets are ordered by 
version
     void update_base_size(const Rowset& rs);
 
@@ -126,8 +134,8 @@ private:
     // Number of sorted arrays (e.g. for rowset with N segments, if rowset is 
overlapping, delta is N, otherwise 1) after cumu point
     std::atomic<int64_t> _approximate_cumu_num_deltas {-1};
 
-    [[maybe_unused]] int64_t _base_compaction_cnt = 0;
-    [[maybe_unused]] int64_t _cumulative_compaction_cnt = 0;
+    int64_t _base_compaction_cnt = 0;
+    int64_t _cumulative_compaction_cnt = 0;
     int64_t _max_version = -1;
     int64_t _base_size = 0;
 };
diff --git a/be/src/common/status.h b/be/src/common/status.h
index a4ab9f60b0f..3fcd0410090 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -276,7 +276,8 @@ namespace ErrorCode {
     E(KEY_NOT_FOUND, -7000, false);                          \
     E(KEY_ALREADY_EXISTS, -7001, false);                     \
     E(ENTRY_NOT_FOUND, -7002, false);                        \
-    E(INVALID_TABLET_STATE, -7211, false);
+    E(INVALID_TABLET_STATE, -7211, false);                   \
+    E(ROWSETS_EXPIRED, -7311, false);
 
 // Define constexpr int error_code_name = error_code_value
 #define M(NAME, ERRORCODE, ENABLESTACKTRACE) constexpr int NAME = ERRORCODE;
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 18445cb17a6..8612e4d83bd 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -22,7 +22,6 @@
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_reader.h"
 #include "olap/tablet_fwd.h"
-#include "olap/tablet_schema_cache.h"
 #include "util/doris_metrics.h"
 #include "vec/common/schema_util.h"
 
@@ -81,7 +80,7 @@ Status BaseTablet::update_by_least_common_schema(const 
TabletSchemaSPtr& update_
     return Status::OK();
 }
 
-Status BaseTablet::capture_rs_readers_unlocked(const std::vector<Version>& 
version_path,
+Status BaseTablet::capture_rs_readers_unlocked(const Versions& version_path,
                                                std::vector<RowSetSplits>* 
rs_splits) const {
     DCHECK(rs_splits != nullptr && rs_splits->empty());
     for (auto version : version_path) {
@@ -104,11 +103,143 @@ Status BaseTablet::capture_rs_readers_unlocked(const 
std::vector<Version>& versi
             return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
                     "failed to create reader for rowset:{}", 
it->second->rowset_id().to_string());
         }
-        rs_splits->push_back(RowSetSplits(std::move(rs_reader)));
+        rs_splits->emplace_back(std::move(rs_reader));
     }
     return Status::OK();
 }
 
+// snapshot manager may call this api to check if version exists, so that
+// the version maybe not exist
+RowsetSharedPtr BaseTablet::get_rowset_by_version(const Version& version,
+                                                  bool find_in_stale) const {
+    auto iter = _rs_version_map.find(version);
+    if (iter == _rs_version_map.end()) {
+        if (find_in_stale) {
+            return get_stale_rowset_by_version(version);
+        }
+        return nullptr;
+    }
+    return iter->second;
+}
+
+RowsetSharedPtr BaseTablet::get_stale_rowset_by_version(const Version& 
version) const {
+    auto iter = _stale_rs_version_map.find(version);
+    if (iter == _stale_rs_version_map.end()) {
+        VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " << 
tablet_id();
+        return nullptr;
+    }
+    return iter->second;
+}
+
+// Already under _meta_lock
+RowsetSharedPtr BaseTablet::get_rowset_with_max_version() const {
+    Version max_version = _tablet_meta->max_version();
+    if (max_version.first == -1) {
+        return nullptr;
+    }
+
+    auto iter = _rs_version_map.find(max_version);
+    if (iter == _rs_version_map.end()) {
+        DCHECK(false) << "invalid version:" << max_version;
+        return nullptr;
+    }
+    return iter->second;
+}
+
+Status BaseTablet::get_all_rs_id(int64_t max_version, RowsetIdUnorderedSet* 
rowset_ids) const {
+    std::shared_lock rlock(_meta_lock);
+    return get_all_rs_id_unlocked(max_version, rowset_ids);
+}
+
+Status BaseTablet::get_all_rs_id_unlocked(int64_t max_version,
+                                          RowsetIdUnorderedSet* rowset_ids) 
const {
+    //  Ensure that the obtained versions of rowsets are continuous
+    Version spec_version(0, max_version);
+    Versions version_path;
+    auto st = 
_timestamped_version_tracker.capture_consistent_versions(spec_version, 
&version_path);
+    if (!st.ok()) [[unlikely]] {
+        return st;
+    }
+
+    for (auto& ver : version_path) {
+        if (ver.second == 1) {
+            // [0-1] rowset is empty for each tablet, skip it
+            continue;
+        }
+        auto it = _rs_version_map.find(ver);
+        if (it == _rs_version_map.end()) {
+            return Status::Error<CAPTURE_ROWSET_ERROR, false>(
+                    "fail to find Rowset for version. tablet={}, version={}", 
tablet_id(),
+                    ver.to_string());
+        }
+        rowset_ids->emplace(it->second->rowset_id());
+    }
+    return Status::OK();
+}
+
+Versions BaseTablet::get_missed_versions(int64_t spec_version) const {
+    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+    Versions existing_versions;
+    {
+        std::shared_lock rdlock(_meta_lock);
+        for (const auto& rs : _tablet_meta->all_rs_metas()) {
+            existing_versions.emplace_back(rs->version());
+        }
+    }
+    return calc_missed_versions(spec_version, existing_versions);
+}
+
+Versions BaseTablet::get_missed_versions_unlocked(int64_t spec_version) const {
+    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+    Versions existing_versions;
+    for (const auto& rs : _tablet_meta->all_rs_metas()) {
+        existing_versions.emplace_back(rs->version());
+    }
+    return calc_missed_versions(spec_version, existing_versions);
+}
+
+Versions BaseTablet::calc_missed_versions(int64_t spec_version, Versions 
existing_versions) {
+    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+    // sort the existing versions in ascending order
+    std::sort(existing_versions.begin(), existing_versions.end(),
+              [](const Version& a, const Version& b) {
+                  // simple because 2 versions are certainly not overlapping
+                  return a.first < b.first;
+              });
+
+    // From the first version(=0), find the missing version until spec_version
+    int64_t last_version = -1;
+    Versions missed_versions;
+    for (const Version& version : existing_versions) {
+        if (version.first > last_version + 1) {
+            // there is a hole between versions
+            missed_versions.emplace_back(last_version + 1, 
std::min(version.first, spec_version));
+        }
+        last_version = version.second;
+        if (last_version >= spec_version) {
+            break;
+        }
+    }
+    if (last_version < spec_version) {
+        // there is a hole between the last version and the specificed version.
+        missed_versions.emplace_back(last_version + 1, spec_version);
+    }
+    return missed_versions;
+}
+
+void BaseTablet::_print_missed_versions(const Versions& missed_versions) const 
{
+    std::stringstream ss;
+    ss << tablet_id() << " has " << missed_versions.size() << " missed 
version:";
+    // print at most 10 version
+    for (int i = 0; i < 10 && i < missed_versions.size(); ++i) {
+        ss << missed_versions[i] << ",";
+    }
+    LOG(WARNING) << ss.str();
+}
+
 bool BaseTablet::_reconstruct_version_tracker_if_necessary() {
     double orphan_vertex_ratio = 
_timestamped_version_tracker.get_orphan_vertex_ratio();
     if (orphan_vertex_ratio >= 
config::tablet_version_graph_orphan_vertex_ratio) {
@@ -119,4 +250,4 @@ bool 
BaseTablet::_reconstruct_version_tracker_if_necessary() {
     return false;
 }
 
-} /* namespace doris */
+} // namespace doris
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index bb327b39532..25c54c47cda 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -44,6 +44,7 @@ public:
     TabletState tablet_state() const { return _tablet_meta->tablet_state(); }
     Status set_tablet_state(TabletState state);
     int64_t table_id() const { return _tablet_meta->table_id(); }
+    int64_t index_id() const { return _tablet_meta->index_id(); }
     int64_t partition_id() const { return _tablet_meta->partition_id(); }
     int64_t tablet_id() const { return _tablet_meta->tablet_id(); }
     int32_t schema_hash() const { return _tablet_meta->schema_hash(); }
@@ -85,16 +86,37 @@ public:
     virtual size_t tablet_footprint() = 0;
 
     // MUST hold shared meta lock
-    Status capture_rs_readers_unlocked(const std::vector<Version>& 
version_path,
+    Status capture_rs_readers_unlocked(const Versions& version_path,
                                        std::vector<RowSetSplits>* rs_splits) 
const;
 
+    // _rs_version_map and _stale_rs_version_map should be protected by 
_meta_lock
+    // The caller must call hold _meta_lock when call this three function.
+    RowsetSharedPtr get_rowset_by_version(const Version& version, bool 
find_is_stale = false) const;
+    RowsetSharedPtr get_stale_rowset_by_version(const Version& version) const;
+    RowsetSharedPtr get_rowset_with_max_version() const;
+
+    Status get_all_rs_id(int64_t max_version, RowsetIdUnorderedSet* 
rowset_ids) const;
+    Status get_all_rs_id_unlocked(int64_t max_version, RowsetIdUnorderedSet* 
rowset_ids) const;
+
+    // Get the missed versions until the spec_version.
+    Versions get_missed_versions(int64_t spec_version) const;
+    Versions get_missed_versions_unlocked(int64_t spec_version) const;
+
 protected:
+    // Find the missed versions until the spec_version.
+    //
+    // for example:
+    //     [0-4][5-5][8-8][9-9][14-14]
+    // if spec_version = 12, it will return [6-7],[10-12]
+    static Versions calc_missed_versions(int64_t spec_version, Versions 
existing_versions);
+
+    void _print_missed_versions(const Versions& missed_versions) const;
     bool _reconstruct_version_tracker_if_necessary();
 
     mutable std::shared_mutex _meta_lock;
     TimestampedVersionTracker _timestamped_version_tracker;
     // After version 0.13, all newly created rowsets are saved in 
_rs_version_map.
-    // And if rowset being compacted, the old rowsetis will be saved in 
_stale_rs_version_map;
+    // And if rowset being compacted, the old rowsets will be saved in 
_stale_rs_version_map;
     std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> 
_rs_version_map;
     // This variable _stale_rs_version_map is used to record these rowsets 
which are be compacted.
     // These _stale rowsets are been removed when rowsets' pathVersion is 
expired,
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index bc84bbcb84e..53d68c56647 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -560,18 +560,16 @@ Status Compaction::do_compaction_impl(int64_t permits) {
         _tablet->set_last_full_compaction_success_time(now);
     }
 
-    int64_t current_max_version;
+    int64_t current_max_version = -1;
     {
         std::shared_lock rdlock(_tablet->get_header_lock());
-        RowsetSharedPtr max_rowset = _tablet->rowset_with_max_version();
-        if (max_rowset == nullptr) {
-            current_max_version = -1;
-        } else {
-            current_max_version = 
_tablet->rowset_with_max_version()->end_version();
+        current_max_version = -1;
+        if (RowsetSharedPtr max_rowset = 
_tablet->get_rowset_with_max_version()) {
+            current_max_version = max_rowset->end_version();
         }
     }
 
-    auto cumu_policy = _tablet->cumulative_compaction_policy();
+    auto* cumu_policy = _tablet->cumulative_compaction_policy();
     DCHECK(cumu_policy);
     LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << 
vertical_compaction
               << ". tablet=" << _tablet->tablet_id() << ", output_version=" << 
_output_version
diff --git a/be/src/olap/rowset/rowset_meta.cpp 
b/be/src/olap/rowset/rowset_meta.cpp
index 7f4798f97e9..d762a438bf9 100644
--- a/be/src/olap/rowset/rowset_meta.cpp
+++ b/be/src/olap/rowset/rowset_meta.cpp
@@ -102,18 +102,20 @@ void RowsetMeta::set_fs(io::FileSystemSPtr fs) {
     _fs = std::move(fs);
 }
 
-void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) const {
+void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema) 
const {
     *rs_meta_pb = _rowset_meta_pb;
-    if (_schema) {
-        _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
+    if (_schema) [[likely]] {
+        rs_meta_pb->set_schema_version(_schema->schema_version());
+        if (!skip_schema) {
+            // For cloud, separate tablet schema from rowset meta to reduce 
persistent size.
+            _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
+        }
     }
 }
 
-RowsetMetaPB RowsetMeta::get_rowset_pb() {
-    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
-    if (_schema) {
-        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
-    }
+RowsetMetaPB RowsetMeta::get_rowset_pb(bool skip_schema) const {
+    RowsetMetaPB rowset_meta_pb;
+    to_rowset_pb(&rowset_meta_pb, skip_schema);
     return rowset_meta_pb;
 }
 
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 7e1dfaa57c3..d2180a46329 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -197,9 +197,11 @@ public:
 
     void set_num_segments(int64_t num_segments) { 
_rowset_meta_pb.set_num_segments(num_segments); }
 
-    void to_rowset_pb(RowsetMetaPB* rs_meta_pb) const;
+    // Convert to RowsetMetaPB, skip_schema is only used by cloud to separate 
schema from rowset meta.
+    void to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema = false) 
const;
 
-    RowsetMetaPB get_rowset_pb();
+    // Convert to RowsetMetaPB, skip_schema is only used by cloud to separate 
schema from rowset meta.
+    RowsetMetaPB get_rowset_pb(bool skip_schema = false) const;
 
     inline DeletePredicatePB* mutable_delete_pred_pb() {
         return _rowset_meta_pb.mutable_delete_predicate();
@@ -302,7 +304,7 @@ public:
     void set_tablet_schema(const TabletSchemaSPtr& tablet_schema);
     void set_tablet_schema(const TabletSchemaPB& tablet_schema);
 
-    const TabletSchemaSPtr& tablet_schema() { return _schema; }
+    const TabletSchemaSPtr& tablet_schema() const { return _schema; }
 
     // Because the member field '_handle' is a raw pointer, use member func 
'init' to replace copy ctor
     RowsetMeta(const RowsetMeta&) = delete;
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index e7fb520c0bf..0c951405b28 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -128,7 +128,7 @@ Status 
RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
         }
         _rowset_ids.clear();
     } else {
-        RETURN_IF_ERROR(tablet()->all_rs_id(cur_max_version, &_rowset_ids));
+        RETURN_IF_ERROR(tablet()->get_all_rs_id_unlocked(cur_max_version, 
&_rowset_ids));
     }
     _delete_bitmap = std::make_shared<DeleteBitmap>(tablet()->tablet_id());
     mow_context =
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 327afd6a8c4..982a91c38e0 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -979,7 +979,7 @@ bool SchemaChangeHandler::tablet_in_converting(int64_t 
tablet_id) {
 Status SchemaChangeHandler::_get_versions_to_be_changed(
         TabletSharedPtr base_tablet, std::vector<Version>* 
versions_to_be_changed,
         RowsetSharedPtr* max_rowset) {
-    RowsetSharedPtr rowset = base_tablet->rowset_with_max_version();
+    RowsetSharedPtr rowset = base_tablet->get_rowset_with_max_version();
     if (rowset == nullptr) {
         return Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>("Tablet has no 
version. base_tablet={}",
                                                           
base_tablet->tablet_id());
diff --git a/be/src/olap/single_replica_compaction.cpp 
b/be/src/olap/single_replica_compaction.cpp
index 72a0cdfc01c..7d78ae1006b 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -157,14 +157,11 @@ Status 
SingleReplicaCompaction::_do_single_replica_compaction_impl() {
         _tablet->set_last_full_compaction_success_time(UnixMillis());
     }
 
-    int64_t current_max_version;
+    int64_t current_max_version = -1;
     {
         std::shared_lock rdlock(_tablet->get_header_lock());
-        RowsetSharedPtr max_rowset = _tablet->rowset_with_max_version();
-        if (max_rowset == nullptr) {
-            current_max_version = -1;
-        } else {
-            current_max_version = 
_tablet->rowset_with_max_version()->end_version();
+        if (RowsetSharedPtr max_rowset = 
_tablet->get_rowset_with_max_version()) {
+            current_max_version = max_rowset->end_version();
         }
     }
 
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index eb42ad4ac72..93b8c30ff54 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -492,7 +492,7 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
                 consistent_rowsets.clear(); // reset vector
 
                 // get latest version
-                const RowsetSharedPtr last_version = 
ref_tablet->rowset_with_max_version();
+                const RowsetSharedPtr last_version = 
ref_tablet->get_rowset_with_max_version();
                 if (last_version == nullptr) {
                     res = Status::InternalError("tablet has not any version. 
path={}",
                                                 ref_tablet->tablet_id());
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 50ec2304096..96f00ccd380 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -601,44 +601,6 @@ void Tablet::delete_rowsets(const 
std::vector<RowsetSharedPtr>& to_delete, bool
     }
 }
 
-// snapshot manager may call this api to check if version exists, so that
-// the version maybe not exist
-const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version,
-                                                    bool find_in_stale) const {
-    auto iter = _rs_version_map.find(version);
-    if (iter == _rs_version_map.end()) {
-        if (find_in_stale) {
-            return get_stale_rowset_by_version(version);
-        }
-        return nullptr;
-    }
-    return iter->second;
-}
-
-const RowsetSharedPtr Tablet::get_stale_rowset_by_version(const Version& 
version) const {
-    auto iter = _stale_rs_version_map.find(version);
-    if (iter == _stale_rs_version_map.end()) {
-        VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " << 
tablet_id();
-        return nullptr;
-    }
-    return iter->second;
-}
-
-// Already under _meta_lock
-const RowsetSharedPtr Tablet::rowset_with_max_version() const {
-    Version max_version = _tablet_meta->max_version();
-    if (max_version.first == -1) {
-        return nullptr;
-    }
-
-    auto iter = _rs_version_map.find(max_version);
-    if (iter == _rs_version_map.end()) {
-        DCHECK(false) << "invalid version:" << max_version;
-        return nullptr;
-    }
-    return iter->second;
-}
-
 TabletSchemaSPtr Tablet::tablet_schema_with_merged_max_schema_version(
         const std::vector<RowsetMetaSharedPtr>& rowset_metas) {
     RowsetMetaSharedPtr max_schema_version_rs = *std::max_element(
@@ -729,16 +691,14 @@ void Tablet::delete_expired_stale_rowset() {
             return;
         }
 
-        const RowsetSharedPtr lastest_delta = rowset_with_max_version();
+        const RowsetSharedPtr lastest_delta = get_rowset_with_max_version();
         if (lastest_delta == nullptr) {
             LOG(WARNING) << "lastest_delta is null " << tablet_id();
             return;
         }
 
         // fetch missing version before delete
-        std::vector<Version> missed_versions;
-        calc_missed_versions_unlocked(lastest_delta->end_version(), 
&missed_versions);
-
+        Versions missed_versions = 
get_missed_versions_unlocked(lastest_delta->end_version());
         if (!missed_versions.empty()) {
             LOG(WARNING) << "tablet:" << tablet_id()
                          << ", missed version for version:" << 
lastest_delta->end_version();
@@ -762,8 +722,8 @@ void Tablet::delete_expired_stale_rowset() {
             // 1. When there is no consistent versions, we must reconstruct 
the tracker.
             if (!status.ok()) {
                 // 2. fetch missing version after delete
-                std::vector<Version> after_missed_versions;
-                calc_missed_versions_unlocked(lastest_delta->end_version(), 
&after_missed_versions);
+                Versions after_missed_versions =
+                        
get_missed_versions_unlocked(lastest_delta->end_version());
 
                 // 2.1 check whether missed_versions and after_missed_versions 
are the same.
                 // when they are the same, it means we can delete the path 
securely.
@@ -788,9 +748,8 @@ void Tablet::delete_expired_stale_rowset() {
 
                     // 4. double check the consistent versions
                     // fetch missing version after recover
-                    std::vector<Version> recover_missed_versions;
-                    calc_missed_versions_unlocked(lastest_delta->end_version(),
-                                                  &recover_missed_versions);
+                    Versions recover_missed_versions =
+                            
get_missed_versions_unlocked(lastest_delta->end_version());
 
                     // 4.1 check whether missed_versions and 
recover_missed_versions are the same.
                     // when they are the same, it means we recover 
successfully.
@@ -869,13 +828,12 @@ void Tablet::delete_expired_stale_rowset() {
 }
 
 Status Tablet::capture_consistent_versions_unlocked(const Version& 
spec_version,
-                                                    std::vector<Version>* 
version_path,
+                                                    Versions* version_path,
                                                     bool skip_missing_version, 
bool quiet) const {
     Status status =
             
_timestamped_version_tracker.capture_consistent_versions(spec_version, 
version_path);
     if (!status.ok() && !quiet) {
-        std::vector<Version> missed_versions;
-        calc_missed_versions_unlocked(spec_version.second, &missed_versions);
+        Versions missed_versions = 
get_missed_versions_unlocked(spec_version.second);
         if (missed_versions.empty()) {
             // if version_path is null, it may be a compaction check logic.
             // so to avoid print too many logs.
@@ -1084,46 +1042,6 @@ uint32_t Tablet::_calc_base_compaction_score() const {
     return base_rowset_exist ? score : 0;
 }
 
-void Tablet::calc_missed_versions(int64_t spec_version, std::vector<Version>* 
missed_versions) {
-    std::shared_lock rdlock(_meta_lock);
-    calc_missed_versions_unlocked(spec_version, missed_versions);
-}
-
-// for example:
-//     [0-4][5-5][8-8][9-9]
-// if spec_version = 6, we still return {7} other than {6, 7}
-void Tablet::calc_missed_versions_unlocked(int64_t spec_version,
-                                           std::vector<Version>* 
missed_versions) const {
-    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
-    std::list<Version> existing_versions;
-    for (auto& rs : _tablet_meta->all_rs_metas()) {
-        existing_versions.emplace_back(rs->version());
-    }
-
-    // sort the existing versions in ascending order
-    existing_versions.sort([](const Version& a, const Version& b) {
-        // simple because 2 versions are certainly not overlapping
-        return a.first < b.first;
-    });
-
-    // From the first version(=0),  find the missing version until spec_version
-    int64_t last_version = -1;
-    for (const Version& version : existing_versions) {
-        if (version.first > last_version + 1) {
-            for (int64_t i = last_version + 1; i < version.first && i <= 
spec_version; ++i) {
-                missed_versions->emplace_back(Version(i, i));
-            }
-        }
-        last_version = version.second;
-        if (last_version >= spec_version) {
-            break;
-        }
-    }
-    for (int64_t i = last_version + 1; i <= spec_version; ++i) {
-        missed_versions->emplace_back(Version(i, i));
-    }
-}
-
 void Tablet::max_continuous_version_from_beginning(Version* version, Version* 
max_version) {
     bool has_version_cross;
     std::shared_lock rdlock(_meta_lock);
@@ -1224,16 +1142,6 @@ bool Tablet::check_path(const std::string& 
path_to_check) const {
     return false;
 }
 
-void Tablet::_print_missed_versions(const std::vector<Version>& 
missed_versions) const {
-    std::stringstream ss;
-    ss << tablet_id() << " has " << missed_versions.size() << " missed 
version:";
-    // print at most 10 version
-    for (int i = 0; i < 10 && i < missed_versions.size(); ++i) {
-        ss << missed_versions[i] << ",";
-    }
-    LOG(WARNING) << ss.str();
-}
-
 Status Tablet::_contains_version(const Version& version) {
     // check if there exist a rowset contains the added rowset
     for (auto& it : _rs_version_map) {
@@ -1984,13 +1892,12 @@ Status 
Tablet::create_transient_rowset_writer(RowsetWriterContext& context,
 
 void Tablet::_init_context_common_fields(RowsetWriterContext& context) {
     context.tablet_uid = tablet_uid();
-
     context.tablet_id = tablet_id();
     context.partition_id = partition_id();
     context.tablet_schema_hash = schema_hash();
     context.rowset_type = tablet_meta()->preferred_rowset_type();
     // Alpha Rowset will be removed in the future, so that if the tablet's 
default rowset type is
-    // alpah rowset, then set the newly created rowset to storage engine's 
default rowset.
+    // alpha rowset, then set the newly created rowset to storage engine's 
default rowset.
     if (context.rowset_type == ALPHA_ROWSET) {
         context.rowset_type = _engine.default_rowset_type();
     }
@@ -3132,7 +3039,7 @@ Status Tablet::update_delete_bitmap_without_lock(const 
RowsetSharedPtr& rowset)
         return Status::OK();
     }
     RowsetIdUnorderedSet cur_rowset_ids;
-    RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
+    RETURN_IF_ERROR(get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids));
     DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet_id());
     RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, 
delete_bitmap));
 
@@ -3181,7 +3088,7 @@ Status Tablet::commit_phase_update_delete_bitmap(
     {
         std::shared_lock meta_rlock(_meta_lock);
         cur_version = max_version_unlocked().second;
-        RETURN_IF_ERROR(all_rs_id(cur_version, &cur_rowset_ids));
+        RETURN_IF_ERROR(get_all_rs_id_unlocked(cur_version, &cur_rowset_ids));
         _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, 
&rowset_ids_to_add,
                                &rowset_ids_to_del);
         specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
@@ -3227,7 +3134,7 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
                       << tablet_id();
             return Status::OK();
         }
-        RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
+        RETURN_IF_ERROR(get_all_rs_id_unlocked(cur_version - 1, 
&cur_rowset_ids));
     }
     auto t2 = watch.get_elapse_time_us();
 
@@ -3394,27 +3301,6 @@ Status Tablet::check_rowid_conversion(
     return Status::OK();
 }
 
-Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* 
rowset_ids) const {
-    //  Ensure that the obtained versions of rowsets are continuous
-    std::vector<Version> version_path;
-    RETURN_IF_ERROR(capture_consistent_versions_unlocked(Version(0, 
max_version), &version_path,
-                                                         false, false));
-    for (auto& ver : version_path) {
-        if (ver.second == 1) {
-            // [0-1] rowset is empty for each tablet, skip it
-            continue;
-        }
-        auto it = _rs_version_map.find(ver);
-        if (it == _rs_version_map.end()) {
-            return Status::Error<CAPTURE_ROWSET_ERROR, false>(
-                    "fail to find Rowset for version. tablet={}, version={}", 
tablet_id(),
-                    ver.to_string());
-        }
-        rowset_ids->emplace(it->second->rowset_id());
-    }
-    return Status::OK();
-}
-
 bool Tablet::check_all_rowset_segment() {
     std::shared_lock rdlock(_meta_lock);
     for (auto& version_rowset : _rs_version_map) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index d953d8fce4f..e17c8b8ff85 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -149,14 +149,6 @@ public:
     Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
                           std::vector<RowsetSharedPtr>& to_delete, bool 
check_delete = false);
 
-    // _rs_version_map and _stale_rs_version_map should be protected by 
_meta_lock
-    // The caller must call hold _meta_lock when call this two function.
-    const RowsetSharedPtr get_rowset_by_version(const Version& version,
-                                                bool find_is_stale = false) 
const;
-    const RowsetSharedPtr get_stale_rowset_by_version(const Version& version) 
const;
-
-    const RowsetSharedPtr rowset_with_max_version() const;
-
     static TabletSchemaSPtr tablet_schema_with_merged_max_schema_version(
             const std::vector<RowsetMetaSharedPtr>& rowset_metas);
 
@@ -170,9 +162,9 @@ public:
     // Given spec_version, find a continuous version path and store it in 
version_path.
     // If quiet is true, then only "does this path exist" is returned.
     // If skip_missing_version is true, return ok even there are missing 
versions.
-    Status capture_consistent_versions_unlocked(const Version& spec_version,
-                                                std::vector<Version>* 
version_path,
+    Status capture_consistent_versions_unlocked(const Version& spec_version, 
Versions* version_path,
                                                 bool skip_missing_version, 
bool quiet) const;
+
     // if quiet is true, no error log will be printed if there are missing 
versions
     Status check_version_integrity(const Version& version, bool quiet = false);
     bool check_version_exist(const Version& version) const;
@@ -204,11 +196,6 @@ public:
             CompactionType compaction_type,
             std::shared_ptr<CumulativeCompactionPolicy> 
cumulative_compaction_policy);
 
-    // operation for clone
-    void calc_missed_versions(int64_t spec_version, std::vector<Version>* 
missed_versions);
-    void calc_missed_versions_unlocked(int64_t spec_version,
-                                       std::vector<Version>* missed_versions) 
const;
-
     // This function to find max continuous version from the beginning.
     // For example: If there are 1, 2, 3, 5, 6, 7 versions belongs tablet, 
then 3 is target.
     // 3 will be saved in "version", and 7 will be saved in "max_version", if 
max_version != nullptr
@@ -483,7 +470,6 @@ public:
             RowsetSharedPtr dst_rowset,
             const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, 
RowLocation>>>&
                     location_map);
-    Status all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) 
const;
     void sort_block(vectorized::Block& in_block, vectorized::Block& 
output_block);
 
     bool check_all_rowset_segment();
@@ -538,7 +524,7 @@ public:
 
     int64_t get_table_id() { return _tablet_meta->table_id(); }
 
-    // binlog releated functions
+    // binlog related functions
     bool is_enable_binlog();
     bool is_binlog_enabled() { return 
_tablet_meta->binlog_config().is_enable(); }
     int64_t binlog_ttl_ms() const { return 
_tablet_meta->binlog_config().ttl_seconds(); }
@@ -560,7 +546,6 @@ public:
 
 private:
     Status _init_once_action();
-    void _print_missed_versions(const std::vector<Version>& missed_versions) 
const;
     bool _contains_rowset(const RowsetId rowset_id);
     Status _contains_version(const Version& version);
 
@@ -758,7 +743,7 @@ inline Version Tablet::max_version() const {
 inline uint64_t Tablet::segment_count() const {
     std::shared_lock rdlock(_meta_lock);
     uint64_t segment_nums = 0;
-    for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
+    for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
         segment_nums += rs_meta->num_segments();
     }
     return segment_nums;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 19cfc6fa964..df9a45b03c8 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -146,8 +146,8 @@ Status TabletManager::_add_tablet_unlocked(TTabletId 
tablet_id, const TabletShar
     int32_t old_version, new_version;
     {
         std::shared_lock rdlock(existed_tablet->get_header_lock());
-        const RowsetSharedPtr old_rowset = 
existed_tablet->rowset_with_max_version();
-        const RowsetSharedPtr new_rowset = tablet->rowset_with_max_version();
+        const RowsetSharedPtr old_rowset = 
existed_tablet->get_rowset_with_max_version();
+        const RowsetSharedPtr new_rowset = 
tablet->get_rowset_with_max_version();
         // If new tablet is empty, it is a newly created schema change tablet.
         // the old tablet is dropped before add tablet. it should not exist 
old tablet
         if (new_rowset == nullptr) {
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 64bf7d4c198..2d131afa870 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -295,6 +295,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t 
partition_id, int64_t tablet_id
 
 TabletMeta::TabletMeta(const TabletMeta& b)
         : _table_id(b._table_id),
+          _index_id(b._index_id),
           _partition_id(b._partition_id),
           _tablet_id(b._tablet_id),
           _replica_id(b._replica_id),
@@ -500,6 +501,7 @@ Status TabletMeta::deserialize(const string& meta_binary) {
 
 void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
     _table_id = tablet_meta_pb.table_id();
+    _index_id = tablet_meta_pb.index_id();
     _partition_id = tablet_meta_pb.partition_id();
     _tablet_id = tablet_meta_pb.tablet_id();
     _replica_id = tablet_meta_pb.replica_id();
@@ -614,6 +616,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& 
tablet_meta_pb) {
 
 void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
     tablet_meta_pb->set_table_id(table_id());
+    tablet_meta_pb->set_index_id(index_id());
     tablet_meta_pb->set_partition_id(partition_id());
     tablet_meta_pb->set_tablet_id(tablet_id());
     tablet_meta_pb->set_replica_id(replica_id());
@@ -857,6 +860,7 @@ Status TabletMeta::set_partition_id(int64_t partition_id) {
 
 bool operator==(const TabletMeta& a, const TabletMeta& b) {
     if (a._table_id != b._table_id) return false;
+    if (a._index_id != b._index_id) return false;
     if (a._partition_id != b._partition_id) return false;
     if (a._tablet_id != b._tablet_id) return false;
     if (a._replica_id != b._replica_id) return false;
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 094bb21507d..d5d1322d341 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -141,6 +141,7 @@ public:
     TabletUid tablet_uid() const;
     void set_tablet_uid(TabletUid uid) { _tablet_uid = uid; }
     int64_t table_id() const;
+    int64_t index_id() const;
     int64_t partition_id() const;
     int64_t tablet_id() const;
     int64_t replica_id() const;
@@ -272,6 +273,7 @@ private:
 
 private:
     int64_t _table_id = 0;
+    int64_t _index_id = 0;
     int64_t _partition_id = 0;
     int64_t _tablet_id = 0;
     int64_t _replica_id = 0;
@@ -526,6 +528,10 @@ inline int64_t TabletMeta::table_id() const {
     return _table_id;
 }
 
+inline int64_t TabletMeta::index_id() const {
+    return _index_id;
+}
+
 inline int64_t TabletMeta::partition_id() const {
     return _partition_id;
 }
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index d8c7a54cb74..8d1d9c966ba 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -188,7 +188,7 @@ Status EngineCloneTask::_do_clone() {
     }
     bool is_new_tablet = tablet == nullptr;
     // try to incremental clone
-    std::vector<Version> missed_versions;
+    Versions missed_versions;
     // try to repair a tablet with missing version
     if (tablet != nullptr) {
         std::shared_lock migration_rlock(tablet->get_migration_lock(), 
std::try_to_lock);
@@ -218,7 +218,7 @@ Status EngineCloneTask::_do_clone() {
             }
         }
 
-        tablet->calc_missed_versions(specified_version, &missed_versions);
+        missed_versions = tablet->get_missed_versions(specified_version);
 
         // if missed version size is 0, then it is useless to clone from 
remote be, it means local data is
         // completed. Or remote be will just return header not the rowset 
files. clone will failed.
@@ -740,8 +740,7 @@ Status EngineCloneTask::_finish_incremental_clone(Tablet* 
tablet,
 
     /// Get missing versions again from local tablet.
     /// We got it before outside the lock, so it has to be got again.
-    std::vector<Version> missed_versions;
-    tablet->calc_missed_versions_unlocked(version, &missed_versions);
+    Versions missed_versions = tablet->get_missed_versions_unlocked(version);
     VLOG_NOTICE << "get missed versions again when finish incremental clone. "
                 << "tablet=" << tablet->tablet_id() << ", clone version=" << 
version
                 << ", missed_versions_size=" << missed_versions.size();
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index efa85406c69..37857efe4df 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -69,7 +69,7 @@ Status EngineStorageMigrationTask::_get_versions(int32_t 
start_version, int32_t*
         return Status::NotSupported(
                 "currently not support migrate tablet with cooldowned remote 
data");
     }
-    const RowsetSharedPtr last_version = _tablet->rowset_with_max_version();
+    const RowsetSharedPtr last_version = 
_tablet->get_rowset_with_max_version();
     if (last_version == nullptr) {
         return Status::InternalError("failed to get rowset with max version, 
tablet={}",
                                      _tablet->tablet_id());
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index c48ebf98a48..4e6553975fa 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -658,8 +658,8 @@ TEST_F(TestDeltaWriter, vec_write) {
     std::cout << "before publish, tablet row nums:" << tablet->num_rows() << 
std::endl;
     OlapMeta* meta = tablet->data_dir()->get_meta();
     Version version;
-    version.first = tablet->rowset_with_max_version()->end_version() + 1;
-    version.second = tablet->rowset_with_max_version()->end_version() + 1;
+    version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
+    version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
     std::cout << "start to add rowset version:" << version.first << "-" << 
version.second
               << std::endl;
     std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
@@ -751,8 +751,8 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     std::cout << "before publish, tablet row nums:" << tablet->num_rows() << 
std::endl;
     OlapMeta* meta = tablet->data_dir()->get_meta();
     Version version;
-    version.first = tablet->rowset_with_max_version()->end_version() + 1;
-    version.second = tablet->rowset_with_max_version()->end_version() + 1;
+    version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
+    version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
     std::cout << "start to add rowset version:" << version.first << "-" << 
version.second
               << std::endl;
     std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
@@ -899,8 +899,8 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
     // publish version on delta writer 1 success
     {
         Version version;
-        version.first = tablet->rowset_with_max_version()->end_version() + 1;
-        version.second = tablet->rowset_with_max_version()->end_version() + 1;
+        version.first = tablet->get_rowset_with_max_version()->end_version() + 
1;
+        version.second = tablet->get_rowset_with_max_version()->end_version() 
+ 1;
         std::cout << "start to add rowset version:" << version.first << "-" << 
version.second
                   << std::endl;
         std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
@@ -950,8 +950,8 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
         ASSERT_TRUE(res.ok());
 
         Version version;
-        version.first = tablet->rowset_with_max_version()->end_version() + 1;
-        version.second = tablet->rowset_with_max_version()->end_version() + 1;
+        version.first = tablet->get_rowset_with_max_version()->end_version() + 
1;
+        version.second = tablet->get_rowset_with_max_version()->end_version() 
+ 1;
         std::cout << "start to add rowset version:" << version.first << "-" << 
version.second
                   << std::endl;
         std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
@@ -982,7 +982,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
         ASSERT_EQ(1, segments.size());
     }
 
-    auto cur_version = tablet->rowset_with_max_version()->end_version();
+    auto cur_version = tablet->get_rowset_with_max_version()->end_version();
     // read data from rowset 1, verify the data correct
     {
         OlapReaderStatistics stats;
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
index c65340522c6..46b4f18c28e 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -218,8 +218,8 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
     TabletSharedPtr tablet = 
k_engine->tablet_manager()->get_tablet(write_req.tablet_id);
     OlapMeta* meta = tablet->data_dir()->get_meta();
     Version version;
-    version.first = tablet->rowset_with_max_version()->end_version() + 1;
-    version.second = tablet->rowset_with_max_version()->end_version() + 1;
+    version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
+    version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
     std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
     StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
             write_req.txn_id, write_req.partition_id, &tablet_related_rs);
diff --git a/be/test/olap/remote_rowset_gc_test.cpp 
b/be/test/olap/remote_rowset_gc_test.cpp
index fe8ab3b0f10..19b02dca9c4 100644
--- a/be/test/olap/remote_rowset_gc_test.cpp
+++ b/be/test/olap/remote_rowset_gc_test.cpp
@@ -211,8 +211,8 @@ TEST_F(RemoteRowsetGcTest, normal) {
             k_engine->tablet_manager()->get_tablet(write_req.tablet_id, 
write_req.schema_hash);
     OlapMeta* meta = tablet->data_dir()->get_meta();
     Version version;
-    version.first = tablet->rowset_with_max_version()->end_version() + 1;
-    version.second = tablet->rowset_with_max_version()->end_version() + 1;
+    version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
+    version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
     std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
     StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
             write_req.txn_id, write_req.partition_id, &tablet_related_rs);
diff --git a/be/test/olap/tablet_cooldown_test.cpp 
b/be/test/olap/tablet_cooldown_test.cpp
index d83ea3eb016..5f307611abe 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -384,8 +384,8 @@ void createTablet(TabletSharedPtr* tablet, int64_t 
replica_id, int32_t schema_ha
     *tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id, 
write_req.schema_hash);
     OlapMeta* meta = (*tablet)->data_dir()->get_meta();
     Version version;
-    version.first = (*tablet)->rowset_with_max_version()->end_version() + 1;
-    version.second = (*tablet)->rowset_with_max_version()->end_version() + 1;
+    version.first = (*tablet)->get_rowset_with_max_version()->end_version() + 
1;
+    version.second = (*tablet)->get_rowset_with_max_version()->end_version() + 
1;
     std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
     k_engine->txn_manager()->get_txn_related_tablets(write_req.txn_id, 
write_req.partition_id,
                                                      &tablet_related_rs);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to