This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 06cbb96e79d [feature](merge-cloud) cloud meta mgr impl prepare/commit/sync rowset (#30128) 06cbb96e79d is described below commit 06cbb96e79d2c782b3533f0f236da7db9ab3455f Author: walter <w41te...@gmail.com> AuthorDate: Tue Jan 23 18:56:54 2024 +0800 [feature](merge-cloud) cloud meta mgr impl prepare/commit/sync rowset (#30128) --- be/src/cloud/cloud_meta_mgr.cpp | 345 ++++++++++++++++++++- be/src/cloud/cloud_meta_mgr.h | 2 +- be/src/cloud/cloud_tablet.cpp | 54 +--- be/src/cloud/cloud_tablet.h | 22 +- be/src/common/status.h | 3 +- be/src/olap/base_tablet.cpp | 139 ++++++++- be/src/olap/base_tablet.h | 26 +- be/src/olap/compaction.cpp | 12 +- be/src/olap/rowset/rowset_meta.cpp | 18 +- be/src/olap/rowset/rowset_meta.h | 8 +- be/src/olap/rowset_builder.cpp | 2 +- be/src/olap/schema_change.cpp | 2 +- be/src/olap/single_replica_compaction.cpp | 9 +- be/src/olap/snapshot_manager.cpp | 2 +- be/src/olap/tablet.cpp | 138 +-------- be/src/olap/tablet.h | 23 +- be/src/olap/tablet_manager.cpp | 4 +- be/src/olap/tablet_meta.cpp | 4 + be/src/olap/tablet_meta.h | 6 + be/src/olap/task/engine_clone_task.cpp | 7 +- be/src/olap/task/engine_storage_migration_task.cpp | 2 +- be/test/olap/delta_writer_test.cpp | 18 +- .../olap/engine_storage_migration_task_test.cpp | 4 +- be/test/olap/remote_rowset_gc_test.cpp | 4 +- be/test/olap/tablet_cooldown_test.cpp | 4 +- 25 files changed, 588 insertions(+), 270 deletions(-) diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index d6eb54e5c41..eb2802b99b0 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -38,6 +38,7 @@ #include "gen_cpp/cloud.pb.h" #include "gen_cpp/olap_file.pb.h" #include "olap/olap_common.h" +#include "olap/rowset/rowset.h" #include "olap/rowset/rowset_factory.h" #include "olap/tablet_meta.h" #include "runtime/stream_load/stream_load_context.h" @@ -137,7 +138,7 @@ private: auto channel = std::make_unique<brpc::Channel>(); Status s = init_channel(channel.get()); - if (UNLIKELY(!s.ok())) { + if (!s.ok()) [[unlikely]] { return s; } @@ -189,6 +190,8 @@ static std::string debug_info(const Request& req) { return fmt::format(" tablet_id={}", req.tablet_id()); } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest>) { return ""; + } else if constexpr (is_any_v<Request, CreateRowsetRequest>) { + return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id()); } else { static_assert(!sizeof(Request)); } @@ -224,7 +227,7 @@ static Status retry_rpc(std::string_view op_name, const Request& req, Response* cntl.set_max_retry(BRPC_RETRY_TIMES); res->Clear(); (stub.get()->*method)(&cntl, &req, res, nullptr); - if (UNLIKELY(cntl.Failed())) { + if (cntl.Failed()) [[unlikely]] { error_msg = cntl.ErrorText(); } else if (res->status().code() == MetaServiceCode::OK) { return Status::OK(); @@ -271,28 +274,339 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tab } Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data) { - return Status::NotSupported("CloudMetaMgr::sync_tablet_rowsets is not implemented"); + using namespace std::chrono; + + TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet); + + std::shared_ptr<MetaService_Stub> stub; + RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub)); + + int tried = 0; + while (true) { + brpc::Controller cntl; + cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); + GetRowsetRequest req; + GetRowsetResponse resp; + + int64_t tablet_id = tablet->tablet_id(); + int64_t table_id = tablet->table_id(); + int64_t index_id = tablet->index_id(); + req.set_cloud_unique_id(config::cloud_unique_id); + auto* idx = req.mutable_idx(); + idx->set_tablet_id(tablet_id); + idx->set_table_id(table_id); + idx->set_index_id(index_id); + idx->set_partition_id(tablet->partition_id()); + { + std::shared_lock rlock(tablet->get_header_lock()); + req.set_start_version(tablet->local_max_version() + 1); + req.set_base_compaction_cnt(tablet->base_compaction_cnt()); + req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt()); + req.set_cumulative_point(tablet->cumulative_layer_point()); + } + req.set_end_version(-1); + VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString(); + + stub->get_rowset(&cntl, &req, &resp, nullptr); + int64_t latency = cntl.latency_us(); + g_get_rowset_latency << latency; + int retry_times = config::meta_service_rpc_retry_times; + if (cntl.Failed()) { + if (tried++ < retry_times) { + auto rng = make_random_engine(); + std::uniform_int_distribution<uint32_t> u(20, 200); + std::uniform_int_distribution<uint32_t> u1(500, 1000); + uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng); + std::this_thread::sleep_for(milliseconds(duration_ms)); + LOG_INFO("failed to get rowset meta") + .tag("reason", cntl.ErrorText()) + .tag("tablet_id", tablet_id) + .tag("table_id", table_id) + .tag("index_id", index_id) + .tag("partition_id", tablet->partition_id()) + .tag("tried", tried) + .tag("sleep", duration_ms); + continue; + } + return Status::RpcError("failed to get rowset meta: {}", cntl.ErrorText()); + } + if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { + return Status::NotFound("failed to get rowset meta: {}", resp.status().msg()); + } + if (resp.status().code() != MetaServiceCode::OK) { + return Status::InternalError("failed to get rowset meta: {}", resp.status().msg()); + } + if (latency > 100 * 1000) { // 100ms + LOG(INFO) << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size() + << ", latency=" << latency << "us"; + } else { + LOG_EVERY_N(INFO, 100) + << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size() + << ", latency=" << latency << "us"; + } + + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + tablet->last_sync_time_s = now; + + if (tablet->enable_unique_key_merge_on_write()) { + DeleteBitmap delete_bitmap(tablet_id); + int64_t old_max_version = req.start_version() - 1; + auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), + resp.stats(), req.idx(), &delete_bitmap); + if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) { + LOG_WARNING("rowset meta is expired, need to retry") + .tag("tablet", tablet->tablet_id()) + .tag("tried", tried) + .error(st); + continue; + } + if (!st.ok()) { + LOG_WARNING("failed to get delete bimtap") + .tag("tablet", tablet->tablet_id()) + .error(st); + return st; + } + tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap); + } + { + const auto& stats = resp.stats(); + std::unique_lock wlock(tablet->get_header_lock()); + + // ATTN: we are facing following data race + // + // resp_base_compaction_cnt=0|base_compaction_cnt=0|resp_cumulative_compaction_cnt=0|cumulative_compaction_cnt=1|resp_max_version=11|max_version=8 + // + // BE-compaction-thread meta-service BE-query-thread + // | | | + // local | commit cumu-compaction | | + // cc_cnt=0 | ---------------------------> | sync rowset (long rpc, local cc_cnt=0 ) | local + // | | <----------------------------------------- | cc_cnt=0 + // | | -. | + // local | done cc_cnt=1 | \ | + // cc_cnt=1 | <--------------------------- | \ | + // | | \ returned with resp cc_cnt=0 (snapshot) | + // | | '------------------------------------> | local + // | | | cc_cnt=1 + // | | | + // | | | CHECK FAIL + // | | | need retry + // To get rid of just retry syncing tablet + if (stats.base_compaction_cnt() < tablet->base_compaction_cnt() || + stats.cumulative_compaction_cnt() < tablet->cumulative_compaction_cnt()) + [[unlikely]] { + // stale request, ignore + LOG_WARNING("stale get rowset meta request") + .tag("resp_base_compaction_cnt", stats.base_compaction_cnt()) + .tag("base_compaction_cnt", tablet->base_compaction_cnt()) + .tag("resp_cumulative_compaction_cnt", stats.cumulative_compaction_cnt()) + .tag("cumulative_compaction_cnt", tablet->cumulative_compaction_cnt()) + .tag("tried", tried); + if (tried++ < 10) continue; + return Status::OK(); + } + std::vector<RowsetSharedPtr> rowsets; + rowsets.reserve(resp.rowset_meta().size()); + for (const auto& cloud_rs_meta_pb : resp.rowset_meta()) { + VLOG_DEBUG << "get rowset meta, tablet_id=" << cloud_rs_meta_pb.tablet_id() + << ", version=[" << cloud_rs_meta_pb.start_version() << '-' + << cloud_rs_meta_pb.end_version() << ']'; + auto existed_rowset = tablet->get_rowset_by_version( + {cloud_rs_meta_pb.start_version(), cloud_rs_meta_pb.end_version()}); + if (existed_rowset && + existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) { + continue; // Same rowset, skip it + } + RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(cloud_rs_meta_pb); + auto rs_meta = std::make_shared<RowsetMeta>(); + rs_meta->init_from_pb(meta_pb); + RowsetSharedPtr rowset; + // schema is nullptr implies using RowsetMeta.tablet_schema + Status s = RowsetFactory::create_rowset(nullptr, tablet->tablet_path(), rs_meta, + &rowset); + if (!s.ok()) { + LOG_WARNING("create rowset").tag("status", s); + return s; + } + rowsets.push_back(std::move(rowset)); + } + if (!rowsets.empty()) { + // `rowsets.empty()` could happen after doing EMPTY_CUMULATIVE compaction. e.g.: + // BE has [0-1][2-11][12-12], [12-12] is delete predicate, cp is 2; + // after doing EMPTY_CUMULATIVE compaction, MS cp is 13, get_rowset will return [2-11][12-12]. + bool version_overlap = + tablet->local_max_version() >= rowsets.front()->start_version(); + tablet->add_rowsets(std::move(rowsets), version_overlap, wlock, warmup_delta_data); + } + tablet->last_base_compaction_success_time_ms = stats.last_base_compaction_time_ms(); + tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms(); + tablet->set_base_compaction_cnt(stats.base_compaction_cnt()); + tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt()); + tablet->set_cumulative_layer_point(stats.cumulative_point()); + tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), + stats.num_rows(), stats.data_size()); + } + return Status::OK(); + } } Status CloudMetaMgr::sync_tablet_delete_bitmap( CloudTablet* tablet, int64_t old_max_version, - const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas, + const google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>& rs_metas, const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap) { - return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is not implemented"); + if (rs_metas.empty()) { + return Status::OK(); + } + + std::shared_ptr<MetaService_Stub> stub; + RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub)); + + int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version()); + brpc::Controller cntl; + // When there are many delete bitmaps that need to be synchronized, it + // may take a longer time, especially when loading the tablet for the + // first time, so set a relatively long timeout time. + cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); + GetDeleteBitmapRequest req; + GetDeleteBitmapResponse res; + req.set_cloud_unique_id(config::cloud_unique_id); + req.set_tablet_id(tablet->tablet_id()); + req.set_base_compaction_cnt(stats.base_compaction_cnt()); + req.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt()); + req.set_cumulative_point(stats.cumulative_point()); + *(req.mutable_idx()) = idx; + // New rowset sync all versions of delete bitmap + for (const auto& rs_meta : rs_metas) { + req.add_rowset_ids(rs_meta.rowset_id_v2()); + req.add_begin_versions(0); + req.add_end_versions(new_max_version); + } + + // old rowset sync incremental versions of delete bitmap + if (old_max_version > 0 && old_max_version < new_max_version) { + RowsetIdUnorderedSet all_rs_ids; + RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); + for (const auto& rs_id : all_rs_ids) { + req.add_rowset_ids(rs_id.to_string()); + req.add_begin_versions(old_max_version + 1); + req.add_end_versions(new_max_version); + } + } + + VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString(); + stub->get_delete_bitmap(&cntl, &req, &res, nullptr); + if (cntl.Failed()) { + return Status::RpcError("failed to get delete bitmap: {}", cntl.ErrorText()); + } + if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { + return Status::NotFound("failed to get delete bitmap: {}", res.status().msg()); + } + // The delete bitmap of stale rowsets will be removed when commit compaction job, + // then delete bitmap of stale rowsets cannot be obtained. But the rowsets obtained + // by sync_tablet_rowsets may include these stale rowsets. When this case happend, the + // error code of ROWSETS_EXPIRED will be returned, we need to retry sync rowsets again. + // + // Be query thread meta-service Be compaction thread + // | | | + // | get rowset | | + // |--------------------------->| | + // | return get rowset | | + // |<---------------------------| | + // | | commit job | + // | |<------------------------| + // | | return commit job | + // | |------------------------>| + // | get delete bitmap | | + // |--------------------------->| | + // | return get delete bitmap | | + // |<---------------------------| | + // | | | + if (res.status().code() == MetaServiceCode::ROWSETS_EXPIRED) { + return Status::Error<ErrorCode::ROWSETS_EXPIRED, false>("failed to get delete bitmap: {}", + res.status().msg()); + } + if (res.status().code() != MetaServiceCode::OK) { + return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to get delete bitmap: {}", + res.status().msg()); + } + const auto& rowset_ids = res.rowset_ids(); + const auto& segment_ids = res.segment_ids(); + const auto& vers = res.versions(); + const auto& delete_bitmaps = res.segment_delete_bitmaps(); + for (size_t i = 0; i < rowset_ids.size(); i++) { + RowsetId rst_id; + rst_id.init(rowset_ids[i]); + delete_bitmap->merge({rst_id, segment_ids[i], vers[i]}, + roaring::Roaring::read(delete_bitmaps[i].data())); + } + return Status::OK(); } Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp, RowsetMetaSharedPtr* existed_rs_meta) { - return Status::NotSupported("CloudMetaMgr::prepare_rowset is not implemented"); + VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id() + << ", rowset_id: " << rs_meta.rowset_id() << ", is_tmp: " << is_tmp; + + CreateRowsetRequest req; + CreateRowsetResponse resp; + req.set_cloud_unique_id(config::cloud_unique_id); + req.set_temporary(is_tmp); + + RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true); + rs_meta.to_rowset_pb(&doris_rs_meta, true); + doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(doris_rs_meta)); + + Status st = retry_rpc("prepare rowset", req, &resp, &MetaService_Stub::prepare_rowset); + if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) { + if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) { + RowsetMetaPB doris_rs_meta = + cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta())); + *existed_rs_meta = std::make_shared<RowsetMeta>(); + (*existed_rs_meta)->init_from_pb(doris_rs_meta); + } + return Status::AlreadyExist("failed to prepare rowset: {}", resp.status().msg()); + } + return st; } Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, bool is_tmp, RowsetMetaSharedPtr* existed_rs_meta) { - return Status::NotSupported("CloudMetaMgr::commit_rowset is not implemented"); + VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id() + << ", rowset_id: " << rs_meta.rowset_id() << ", is_tmp: " << is_tmp; + CreateRowsetRequest req; + CreateRowsetResponse resp; + req.set_cloud_unique_id(config::cloud_unique_id); + req.set_temporary(is_tmp); + + RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(); + doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb)); + Status st = retry_rpc("commit rowset", req, &resp, &MetaService_Stub::commit_rowset); + if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) { + if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) { + RowsetMetaPB doris_rs_meta = + cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta())); + *existed_rs_meta = std::make_shared<RowsetMeta>(); + (*existed_rs_meta)->init_from_pb(doris_rs_meta); + } + return Status::AlreadyExist("failed to commit rowset: {}", resp.status().msg()); + } + return st; } Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) { - return Status::NotSupported("CloudMetaMgr::update_tmp_rowset is not implemented"); + VLOG_DEBUG << "update committed rowset, tablet_id: " << rs_meta.tablet_id() + << ", rowset_id: " << rs_meta.rowset_id(); + CreateRowsetRequest req; + CreateRowsetResponse resp; + req.set_cloud_unique_id(config::cloud_unique_id); + + RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(true); + doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb)); + Status st = + retry_rpc("update committed rowset", req, &resp, &MetaService_Stub::update_tmp_rowset); + if (!st.ok() && resp.status().code() == MetaServiceCode::ROWSET_META_NOT_FOUND) { + return Status::InternalError("failed to update committed rowset: {}", resp.status().msg()); + } + return st; } Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { @@ -436,15 +750,14 @@ Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t loc req.set_tablet_id(tablet.tablet_id()); req.set_lock_id(lock_id); req.set_initiator(initiator); - for (auto iter = delete_bitmap->delete_bitmap.begin(); - iter != delete_bitmap->delete_bitmap.end(); ++iter) { - req.add_rowset_ids(std::get<0>(iter->first).to_string()); - req.add_segment_ids(std::get<1>(iter->first)); - req.add_versions(std::get<2>(iter->first)); + for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) { + req.add_rowset_ids(std::get<0>(key).to_string()); + req.add_segment_ids(std::get<1>(key)); + req.add_versions(std::get<2>(key)); // To save space, convert array and bitmap containers to run containers - iter->second.runOptimize(); - std::string bitmap_data(iter->second.getSizeInBytes(), '\0'); - iter->second.write(bitmap_data.data()); + bitmap.runOptimize(); + std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); + bitmap.write(bitmap_data.data()); *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data); } auto st = retry_rpc("update delete bitmap", req, &res, &MetaService_Stub::update_delete_bitmap); diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index af5b048b2f0..59f11c17152 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -88,7 +88,7 @@ public: private: Status sync_tablet_delete_bitmap( CloudTablet* tablet, int64_t old_max_version, - const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas, + const google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>& rs_metas, const TabletStatsPB& stas, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap); }; diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 6beb1e45d94..c83731fe83f 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -53,7 +53,7 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version, if (!st.ok()) { rlock.unlock(); // avoid logging in lock range // Check no missed versions or req version is merged - auto missed_versions = calc_missed_versions(spec_version.second); + auto missed_versions = get_missed_versions(spec_version.second); if (missed_versions.empty()) { st.set_code(VERSION_ALREADY_MERGED); // Reset error code } @@ -67,50 +67,6 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version, return capture_rs_readers_unlocked(version_path, rs_splits); } -// for example: -// [0-4][5-5][8-8][9-9][13-13] -// if spec_version = 12, it will return [6-7],[10-12] -Versions CloudTablet::calc_missed_versions(int64_t spec_version) { - DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version; - - Versions missed_versions; - Versions existing_versions; - { - std::shared_lock rdlock(_meta_lock); - for (const auto& rs : _tablet_meta->all_rs_metas()) { - existing_versions.emplace_back(rs->version()); - } - } - - // sort the existing versions in ascending order - std::sort(existing_versions.begin(), existing_versions.end(), - [](const Version& a, const Version& b) { - // simple because 2 versions are certainly not overlapping - return a.first < b.first; - }); - - auto min_version = existing_versions.front().first; - if (min_version > 0) { - missed_versions.emplace_back(0, std::min(spec_version, min_version - 1)); - } - for (auto it = existing_versions.begin(); it != existing_versions.end() - 1; ++it) { - auto prev_v = it->second; - if (prev_v >= spec_version) { - return missed_versions; - } - auto next_v = (it + 1)->first; - if (next_v > prev_v + 1) { - // there is a hole between versions - missed_versions.emplace_back(prev_v + 1, std::min(spec_version, next_v - 1)); - } - } - auto max_version = existing_versions.back().second; - if (max_version < spec_version) { - missed_versions.emplace_back(max_version + 1, spec_version); - } - return missed_versions; -} - Status CloudTablet::sync_meta() { // TODO(lightman): FileCache return Status::NotSupported("CloudTablet::sync_meta is not implemented"); @@ -451,4 +407,12 @@ void CloudTablet::get_compaction_status(std::string* json_result) { *json_result = std::string(strbuf.GetString()); } +inline void CloudTablet::set_cumulative_layer_point(int64_t new_point) { + // cumulative point should only be reset to -1, or be increased + CHECK(new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) + << "Unexpected cumulative point: " << new_point + << ", origin: " << _cumulative_point.load(); + _cumulative_point = new_point; +} + } // namespace doris diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index bf8db3c9451..bfd22d2f54c 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -20,7 +20,6 @@ #include <atomic> #include "olap/base_tablet.h" -#include "olap/version_graph.h" namespace doris { @@ -64,7 +63,7 @@ public: // If tablet state is not `TABLET_RUNNING`, sync tablet meta and all visible rowsets. // If `query_version` > 0 and local max_version of the tablet >= `query_version`, do nothing. // If 'need_download_data_async' is true, it means that we need to download the new version - // rowsets datas async. + // rowsets datum async. Status sync_rowsets(int64_t query_version = -1, bool warmup_delta_data = false); // Synchronize the tablet meta from meta service. @@ -74,7 +73,7 @@ public: // If 'warmup_delta_data' is true, download the new version rowset data in background. // MUST hold EXCLUSIVE `_meta_lock`. // If 'need_download_data_async' is true, it means that we need to download the new version - // rowsets datas async. + // rowsets datum async. void add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap, std::unique_lock<std::shared_mutex>& meta_lock, bool warmup_delta_data = false); @@ -98,6 +97,17 @@ public: int64_t get_cloud_base_compaction_score() const; int64_t get_cloud_cumu_compaction_score() const; + int64_t local_max_version() const { return _max_version; } + int64_t base_compaction_cnt() const { return _base_compaction_cnt; } + int64_t cumulative_compaction_cnt() const { return _cumulative_compaction_cnt; } + int64_t cumulative_layer_point() const { + return _cumulative_point.load(std::memory_order_relaxed); + } + + void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; } + void set_cumulative_compaction_cnt(int64_t cnt) { _cumulative_compaction_cnt = cnt; } + void set_cumulative_layer_point(int64_t new_point); + int64_t last_sync_time_s = 0; int64_t last_load_time_ms = 0; int64_t last_base_compaction_success_time_ms = 0; @@ -105,8 +115,6 @@ public: int64_t last_cumu_no_suitable_version_ms = 0; private: - Versions calc_missed_versions(int64_t spec_version); - // FIXME(plat1ko): No need to record base size if rowsets are ordered by version void update_base_size(const Rowset& rs); @@ -126,8 +134,8 @@ private: // Number of sorted arrays (e.g. for rowset with N segments, if rowset is overlapping, delta is N, otherwise 1) after cumu point std::atomic<int64_t> _approximate_cumu_num_deltas {-1}; - [[maybe_unused]] int64_t _base_compaction_cnt = 0; - [[maybe_unused]] int64_t _cumulative_compaction_cnt = 0; + int64_t _base_compaction_cnt = 0; + int64_t _cumulative_compaction_cnt = 0; int64_t _max_version = -1; int64_t _base_size = 0; }; diff --git a/be/src/common/status.h b/be/src/common/status.h index a4ab9f60b0f..3fcd0410090 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -276,7 +276,8 @@ namespace ErrorCode { E(KEY_NOT_FOUND, -7000, false); \ E(KEY_ALREADY_EXISTS, -7001, false); \ E(ENTRY_NOT_FOUND, -7002, false); \ - E(INVALID_TABLET_STATE, -7211, false); + E(INVALID_TABLET_STATE, -7211, false); \ + E(ROWSETS_EXPIRED, -7311, false); // Define constexpr int error_code_name = error_code_value #define M(NAME, ERRORCODE, ENABLESTACKTRACE) constexpr int NAME = ERRORCODE; diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 18445cb17a6..8612e4d83bd 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -22,7 +22,6 @@ #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_reader.h" #include "olap/tablet_fwd.h" -#include "olap/tablet_schema_cache.h" #include "util/doris_metrics.h" #include "vec/common/schema_util.h" @@ -81,7 +80,7 @@ Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_ return Status::OK(); } -Status BaseTablet::capture_rs_readers_unlocked(const std::vector<Version>& version_path, +Status BaseTablet::capture_rs_readers_unlocked(const Versions& version_path, std::vector<RowSetSplits>* rs_splits) const { DCHECK(rs_splits != nullptr && rs_splits->empty()); for (auto version : version_path) { @@ -104,11 +103,143 @@ Status BaseTablet::capture_rs_readers_unlocked(const std::vector<Version>& versi return Status::Error<CAPTURE_ROWSET_READER_ERROR>( "failed to create reader for rowset:{}", it->second->rowset_id().to_string()); } - rs_splits->push_back(RowSetSplits(std::move(rs_reader))); + rs_splits->emplace_back(std::move(rs_reader)); } return Status::OK(); } +// snapshot manager may call this api to check if version exists, so that +// the version maybe not exist +RowsetSharedPtr BaseTablet::get_rowset_by_version(const Version& version, + bool find_in_stale) const { + auto iter = _rs_version_map.find(version); + if (iter == _rs_version_map.end()) { + if (find_in_stale) { + return get_stale_rowset_by_version(version); + } + return nullptr; + } + return iter->second; +} + +RowsetSharedPtr BaseTablet::get_stale_rowset_by_version(const Version& version) const { + auto iter = _stale_rs_version_map.find(version); + if (iter == _stale_rs_version_map.end()) { + VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " << tablet_id(); + return nullptr; + } + return iter->second; +} + +// Already under _meta_lock +RowsetSharedPtr BaseTablet::get_rowset_with_max_version() const { + Version max_version = _tablet_meta->max_version(); + if (max_version.first == -1) { + return nullptr; + } + + auto iter = _rs_version_map.find(max_version); + if (iter == _rs_version_map.end()) { + DCHECK(false) << "invalid version:" << max_version; + return nullptr; + } + return iter->second; +} + +Status BaseTablet::get_all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const { + std::shared_lock rlock(_meta_lock); + return get_all_rs_id_unlocked(max_version, rowset_ids); +} + +Status BaseTablet::get_all_rs_id_unlocked(int64_t max_version, + RowsetIdUnorderedSet* rowset_ids) const { + // Ensure that the obtained versions of rowsets are continuous + Version spec_version(0, max_version); + Versions version_path; + auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path); + if (!st.ok()) [[unlikely]] { + return st; + } + + for (auto& ver : version_path) { + if (ver.second == 1) { + // [0-1] rowset is empty for each tablet, skip it + continue; + } + auto it = _rs_version_map.find(ver); + if (it == _rs_version_map.end()) { + return Status::Error<CAPTURE_ROWSET_ERROR, false>( + "fail to find Rowset for version. tablet={}, version={}", tablet_id(), + ver.to_string()); + } + rowset_ids->emplace(it->second->rowset_id()); + } + return Status::OK(); +} + +Versions BaseTablet::get_missed_versions(int64_t spec_version) const { + DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version; + + Versions existing_versions; + { + std::shared_lock rdlock(_meta_lock); + for (const auto& rs : _tablet_meta->all_rs_metas()) { + existing_versions.emplace_back(rs->version()); + } + } + return calc_missed_versions(spec_version, existing_versions); +} + +Versions BaseTablet::get_missed_versions_unlocked(int64_t spec_version) const { + DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version; + + Versions existing_versions; + for (const auto& rs : _tablet_meta->all_rs_metas()) { + existing_versions.emplace_back(rs->version()); + } + return calc_missed_versions(spec_version, existing_versions); +} + +Versions BaseTablet::calc_missed_versions(int64_t spec_version, Versions existing_versions) { + DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version; + + // sort the existing versions in ascending order + std::sort(existing_versions.begin(), existing_versions.end(), + [](const Version& a, const Version& b) { + // simple because 2 versions are certainly not overlapping + return a.first < b.first; + }); + + // From the first version(=0), find the missing version until spec_version + int64_t last_version = -1; + Versions missed_versions; + for (const Version& version : existing_versions) { + if (version.first > last_version + 1) { + // there is a hole between versions + missed_versions.emplace_back(last_version + 1, std::min(version.first, spec_version)); + } + last_version = version.second; + if (last_version >= spec_version) { + break; + } + } + if (last_version < spec_version) { + // there is a hole between the last version and the specificed version. + missed_versions.emplace_back(last_version + 1, spec_version); + } + return missed_versions; +} + +void BaseTablet::_print_missed_versions(const Versions& missed_versions) const { + std::stringstream ss; + ss << tablet_id() << " has " << missed_versions.size() << " missed version:"; + // print at most 10 version + for (int i = 0; i < 10 && i < missed_versions.size(); ++i) { + ss << missed_versions[i] << ","; + } + LOG(WARNING) << ss.str(); +} + bool BaseTablet::_reconstruct_version_tracker_if_necessary() { double orphan_vertex_ratio = _timestamped_version_tracker.get_orphan_vertex_ratio(); if (orphan_vertex_ratio >= config::tablet_version_graph_orphan_vertex_ratio) { @@ -119,4 +250,4 @@ bool BaseTablet::_reconstruct_version_tracker_if_necessary() { return false; } -} /* namespace doris */ +} // namespace doris diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index bb327b39532..25c54c47cda 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -44,6 +44,7 @@ public: TabletState tablet_state() const { return _tablet_meta->tablet_state(); } Status set_tablet_state(TabletState state); int64_t table_id() const { return _tablet_meta->table_id(); } + int64_t index_id() const { return _tablet_meta->index_id(); } int64_t partition_id() const { return _tablet_meta->partition_id(); } int64_t tablet_id() const { return _tablet_meta->tablet_id(); } int32_t schema_hash() const { return _tablet_meta->schema_hash(); } @@ -85,16 +86,37 @@ public: virtual size_t tablet_footprint() = 0; // MUST hold shared meta lock - Status capture_rs_readers_unlocked(const std::vector<Version>& version_path, + Status capture_rs_readers_unlocked(const Versions& version_path, std::vector<RowSetSplits>* rs_splits) const; + // _rs_version_map and _stale_rs_version_map should be protected by _meta_lock + // The caller must call hold _meta_lock when call this three function. + RowsetSharedPtr get_rowset_by_version(const Version& version, bool find_is_stale = false) const; + RowsetSharedPtr get_stale_rowset_by_version(const Version& version) const; + RowsetSharedPtr get_rowset_with_max_version() const; + + Status get_all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const; + Status get_all_rs_id_unlocked(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const; + + // Get the missed versions until the spec_version. + Versions get_missed_versions(int64_t spec_version) const; + Versions get_missed_versions_unlocked(int64_t spec_version) const; + protected: + // Find the missed versions until the spec_version. + // + // for example: + // [0-4][5-5][8-8][9-9][14-14] + // if spec_version = 12, it will return [6-7],[10-12] + static Versions calc_missed_versions(int64_t spec_version, Versions existing_versions); + + void _print_missed_versions(const Versions& missed_versions) const; bool _reconstruct_version_tracker_if_necessary(); mutable std::shared_mutex _meta_lock; TimestampedVersionTracker _timestamped_version_tracker; // After version 0.13, all newly created rowsets are saved in _rs_version_map. - // And if rowset being compacted, the old rowsetis will be saved in _stale_rs_version_map; + // And if rowset being compacted, the old rowsets will be saved in _stale_rs_version_map; std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _rs_version_map; // This variable _stale_rs_version_map is used to record these rowsets which are be compacted. // These _stale rowsets are been removed when rowsets' pathVersion is expired, diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index bc84bbcb84e..53d68c56647 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -560,18 +560,16 @@ Status Compaction::do_compaction_impl(int64_t permits) { _tablet->set_last_full_compaction_success_time(now); } - int64_t current_max_version; + int64_t current_max_version = -1; { std::shared_lock rdlock(_tablet->get_header_lock()); - RowsetSharedPtr max_rowset = _tablet->rowset_with_max_version(); - if (max_rowset == nullptr) { - current_max_version = -1; - } else { - current_max_version = _tablet->rowset_with_max_version()->end_version(); + current_max_version = -1; + if (RowsetSharedPtr max_rowset = _tablet->get_rowset_with_max_version()) { + current_max_version = max_rowset->end_version(); } } - auto cumu_policy = _tablet->cumulative_compaction_policy(); + auto* cumu_policy = _tablet->cumulative_compaction_policy(); DCHECK(cumu_policy); LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << vertical_compaction << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index 7f4798f97e9..d762a438bf9 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -102,18 +102,20 @@ void RowsetMeta::set_fs(io::FileSystemSPtr fs) { _fs = std::move(fs); } -void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) const { +void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema) const { *rs_meta_pb = _rowset_meta_pb; - if (_schema) { - _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema()); + if (_schema) [[likely]] { + rs_meta_pb->set_schema_version(_schema->schema_version()); + if (!skip_schema) { + // For cloud, separate tablet schema from rowset meta to reduce persistent size. + _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema()); + } } } -RowsetMetaPB RowsetMeta::get_rowset_pb() { - RowsetMetaPB rowset_meta_pb = _rowset_meta_pb; - if (_schema) { - _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema()); - } +RowsetMetaPB RowsetMeta::get_rowset_pb(bool skip_schema) const { + RowsetMetaPB rowset_meta_pb; + to_rowset_pb(&rowset_meta_pb, skip_schema); return rowset_meta_pb; } diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 7e1dfaa57c3..d2180a46329 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -197,9 +197,11 @@ public: void set_num_segments(int64_t num_segments) { _rowset_meta_pb.set_num_segments(num_segments); } - void to_rowset_pb(RowsetMetaPB* rs_meta_pb) const; + // Convert to RowsetMetaPB, skip_schema is only used by cloud to separate schema from rowset meta. + void to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema = false) const; - RowsetMetaPB get_rowset_pb(); + // Convert to RowsetMetaPB, skip_schema is only used by cloud to separate schema from rowset meta. + RowsetMetaPB get_rowset_pb(bool skip_schema = false) const; inline DeletePredicatePB* mutable_delete_pred_pb() { return _rowset_meta_pb.mutable_delete_predicate(); @@ -302,7 +304,7 @@ public: void set_tablet_schema(const TabletSchemaSPtr& tablet_schema); void set_tablet_schema(const TabletSchemaPB& tablet_schema); - const TabletSchemaSPtr& tablet_schema() { return _schema; } + const TabletSchemaSPtr& tablet_schema() const { return _schema; } // Because the member field '_handle' is a raw pointer, use member func 'init' to replace copy ctor RowsetMeta(const RowsetMeta&) = delete; diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index e7fb520c0bf..0c951405b28 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -128,7 +128,7 @@ Status RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context) } _rowset_ids.clear(); } else { - RETURN_IF_ERROR(tablet()->all_rs_id(cur_max_version, &_rowset_ids)); + RETURN_IF_ERROR(tablet()->get_all_rs_id_unlocked(cur_max_version, &_rowset_ids)); } _delete_bitmap = std::make_shared<DeleteBitmap>(tablet()->tablet_id()); mow_context = diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 327afd6a8c4..982a91c38e0 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -979,7 +979,7 @@ bool SchemaChangeHandler::tablet_in_converting(int64_t tablet_id) { Status SchemaChangeHandler::_get_versions_to_be_changed( TabletSharedPtr base_tablet, std::vector<Version>* versions_to_be_changed, RowsetSharedPtr* max_rowset) { - RowsetSharedPtr rowset = base_tablet->rowset_with_max_version(); + RowsetSharedPtr rowset = base_tablet->get_rowset_with_max_version(); if (rowset == nullptr) { return Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>("Tablet has no version. base_tablet={}", base_tablet->tablet_id()); diff --git a/be/src/olap/single_replica_compaction.cpp b/be/src/olap/single_replica_compaction.cpp index 72a0cdfc01c..7d78ae1006b 100644 --- a/be/src/olap/single_replica_compaction.cpp +++ b/be/src/olap/single_replica_compaction.cpp @@ -157,14 +157,11 @@ Status SingleReplicaCompaction::_do_single_replica_compaction_impl() { _tablet->set_last_full_compaction_success_time(UnixMillis()); } - int64_t current_max_version; + int64_t current_max_version = -1; { std::shared_lock rdlock(_tablet->get_header_lock()); - RowsetSharedPtr max_rowset = _tablet->rowset_with_max_version(); - if (max_rowset == nullptr) { - current_max_version = -1; - } else { - current_max_version = _tablet->rowset_with_max_version()->end_version(); + if (RowsetSharedPtr max_rowset = _tablet->get_rowset_with_max_version()) { + current_max_version = max_rowset->end_version(); } } diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index eb42ad4ac72..93b8c30ff54 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -492,7 +492,7 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet consistent_rowsets.clear(); // reset vector // get latest version - const RowsetSharedPtr last_version = ref_tablet->rowset_with_max_version(); + const RowsetSharedPtr last_version = ref_tablet->get_rowset_with_max_version(); if (last_version == nullptr) { res = Status::InternalError("tablet has not any version. path={}", ref_tablet->tablet_id()); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 50ec2304096..96f00ccd380 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -601,44 +601,6 @@ void Tablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool } } -// snapshot manager may call this api to check if version exists, so that -// the version maybe not exist -const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version, - bool find_in_stale) const { - auto iter = _rs_version_map.find(version); - if (iter == _rs_version_map.end()) { - if (find_in_stale) { - return get_stale_rowset_by_version(version); - } - return nullptr; - } - return iter->second; -} - -const RowsetSharedPtr Tablet::get_stale_rowset_by_version(const Version& version) const { - auto iter = _stale_rs_version_map.find(version); - if (iter == _stale_rs_version_map.end()) { - VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " << tablet_id(); - return nullptr; - } - return iter->second; -} - -// Already under _meta_lock -const RowsetSharedPtr Tablet::rowset_with_max_version() const { - Version max_version = _tablet_meta->max_version(); - if (max_version.first == -1) { - return nullptr; - } - - auto iter = _rs_version_map.find(max_version); - if (iter == _rs_version_map.end()) { - DCHECK(false) << "invalid version:" << max_version; - return nullptr; - } - return iter->second; -} - TabletSchemaSPtr Tablet::tablet_schema_with_merged_max_schema_version( const std::vector<RowsetMetaSharedPtr>& rowset_metas) { RowsetMetaSharedPtr max_schema_version_rs = *std::max_element( @@ -729,16 +691,14 @@ void Tablet::delete_expired_stale_rowset() { return; } - const RowsetSharedPtr lastest_delta = rowset_with_max_version(); + const RowsetSharedPtr lastest_delta = get_rowset_with_max_version(); if (lastest_delta == nullptr) { LOG(WARNING) << "lastest_delta is null " << tablet_id(); return; } // fetch missing version before delete - std::vector<Version> missed_versions; - calc_missed_versions_unlocked(lastest_delta->end_version(), &missed_versions); - + Versions missed_versions = get_missed_versions_unlocked(lastest_delta->end_version()); if (!missed_versions.empty()) { LOG(WARNING) << "tablet:" << tablet_id() << ", missed version for version:" << lastest_delta->end_version(); @@ -762,8 +722,8 @@ void Tablet::delete_expired_stale_rowset() { // 1. When there is no consistent versions, we must reconstruct the tracker. if (!status.ok()) { // 2. fetch missing version after delete - std::vector<Version> after_missed_versions; - calc_missed_versions_unlocked(lastest_delta->end_version(), &after_missed_versions); + Versions after_missed_versions = + get_missed_versions_unlocked(lastest_delta->end_version()); // 2.1 check whether missed_versions and after_missed_versions are the same. // when they are the same, it means we can delete the path securely. @@ -788,9 +748,8 @@ void Tablet::delete_expired_stale_rowset() { // 4. double check the consistent versions // fetch missing version after recover - std::vector<Version> recover_missed_versions; - calc_missed_versions_unlocked(lastest_delta->end_version(), - &recover_missed_versions); + Versions recover_missed_versions = + get_missed_versions_unlocked(lastest_delta->end_version()); // 4.1 check whether missed_versions and recover_missed_versions are the same. // when they are the same, it means we recover successfully. @@ -869,13 +828,12 @@ void Tablet::delete_expired_stale_rowset() { } Status Tablet::capture_consistent_versions_unlocked(const Version& spec_version, - std::vector<Version>* version_path, + Versions* version_path, bool skip_missing_version, bool quiet) const { Status status = _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path); if (!status.ok() && !quiet) { - std::vector<Version> missed_versions; - calc_missed_versions_unlocked(spec_version.second, &missed_versions); + Versions missed_versions = get_missed_versions_unlocked(spec_version.second); if (missed_versions.empty()) { // if version_path is null, it may be a compaction check logic. // so to avoid print too many logs. @@ -1084,46 +1042,6 @@ uint32_t Tablet::_calc_base_compaction_score() const { return base_rowset_exist ? score : 0; } -void Tablet::calc_missed_versions(int64_t spec_version, std::vector<Version>* missed_versions) { - std::shared_lock rdlock(_meta_lock); - calc_missed_versions_unlocked(spec_version, missed_versions); -} - -// for example: -// [0-4][5-5][8-8][9-9] -// if spec_version = 6, we still return {7} other than {6, 7} -void Tablet::calc_missed_versions_unlocked(int64_t spec_version, - std::vector<Version>* missed_versions) const { - DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version; - std::list<Version> existing_versions; - for (auto& rs : _tablet_meta->all_rs_metas()) { - existing_versions.emplace_back(rs->version()); - } - - // sort the existing versions in ascending order - existing_versions.sort([](const Version& a, const Version& b) { - // simple because 2 versions are certainly not overlapping - return a.first < b.first; - }); - - // From the first version(=0), find the missing version until spec_version - int64_t last_version = -1; - for (const Version& version : existing_versions) { - if (version.first > last_version + 1) { - for (int64_t i = last_version + 1; i < version.first && i <= spec_version; ++i) { - missed_versions->emplace_back(Version(i, i)); - } - } - last_version = version.second; - if (last_version >= spec_version) { - break; - } - } - for (int64_t i = last_version + 1; i <= spec_version; ++i) { - missed_versions->emplace_back(Version(i, i)); - } -} - void Tablet::max_continuous_version_from_beginning(Version* version, Version* max_version) { bool has_version_cross; std::shared_lock rdlock(_meta_lock); @@ -1224,16 +1142,6 @@ bool Tablet::check_path(const std::string& path_to_check) const { return false; } -void Tablet::_print_missed_versions(const std::vector<Version>& missed_versions) const { - std::stringstream ss; - ss << tablet_id() << " has " << missed_versions.size() << " missed version:"; - // print at most 10 version - for (int i = 0; i < 10 && i < missed_versions.size(); ++i) { - ss << missed_versions[i] << ","; - } - LOG(WARNING) << ss.str(); -} - Status Tablet::_contains_version(const Version& version) { // check if there exist a rowset contains the added rowset for (auto& it : _rs_version_map) { @@ -1984,13 +1892,12 @@ Status Tablet::create_transient_rowset_writer(RowsetWriterContext& context, void Tablet::_init_context_common_fields(RowsetWriterContext& context) { context.tablet_uid = tablet_uid(); - context.tablet_id = tablet_id(); context.partition_id = partition_id(); context.tablet_schema_hash = schema_hash(); context.rowset_type = tablet_meta()->preferred_rowset_type(); // Alpha Rowset will be removed in the future, so that if the tablet's default rowset type is - // alpah rowset, then set the newly created rowset to storage engine's default rowset. + // alpha rowset, then set the newly created rowset to storage engine's default rowset. if (context.rowset_type == ALPHA_ROWSET) { context.rowset_type = _engine.default_rowset_type(); } @@ -3132,7 +3039,7 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) return Status::OK(); } RowsetIdUnorderedSet cur_rowset_ids; - RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids)); + RETURN_IF_ERROR(get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids)); DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id()); RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); @@ -3181,7 +3088,7 @@ Status Tablet::commit_phase_update_delete_bitmap( { std::shared_lock meta_rlock(_meta_lock); cur_version = max_version_unlocked().second; - RETURN_IF_ERROR(all_rs_id(cur_version, &cur_rowset_ids)); + RETURN_IF_ERROR(get_all_rs_id_unlocked(cur_version, &cur_rowset_ids)); _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add); @@ -3227,7 +3134,7 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, << tablet_id(); return Status::OK(); } - RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids)); + RETURN_IF_ERROR(get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids)); } auto t2 = watch.get_elapse_time_us(); @@ -3394,27 +3301,6 @@ Status Tablet::check_rowid_conversion( return Status::OK(); } -Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const { - // Ensure that the obtained versions of rowsets are continuous - std::vector<Version> version_path; - RETURN_IF_ERROR(capture_consistent_versions_unlocked(Version(0, max_version), &version_path, - false, false)); - for (auto& ver : version_path) { - if (ver.second == 1) { - // [0-1] rowset is empty for each tablet, skip it - continue; - } - auto it = _rs_version_map.find(ver); - if (it == _rs_version_map.end()) { - return Status::Error<CAPTURE_ROWSET_ERROR, false>( - "fail to find Rowset for version. tablet={}, version={}", tablet_id(), - ver.to_string()); - } - rowset_ids->emplace(it->second->rowset_id()); - } - return Status::OK(); -} - bool Tablet::check_all_rowset_segment() { std::shared_lock rdlock(_meta_lock); for (auto& version_rowset : _rs_version_map) { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index d953d8fce4f..e17c8b8ff85 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -149,14 +149,6 @@ public: Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add, std::vector<RowsetSharedPtr>& to_delete, bool check_delete = false); - // _rs_version_map and _stale_rs_version_map should be protected by _meta_lock - // The caller must call hold _meta_lock when call this two function. - const RowsetSharedPtr get_rowset_by_version(const Version& version, - bool find_is_stale = false) const; - const RowsetSharedPtr get_stale_rowset_by_version(const Version& version) const; - - const RowsetSharedPtr rowset_with_max_version() const; - static TabletSchemaSPtr tablet_schema_with_merged_max_schema_version( const std::vector<RowsetMetaSharedPtr>& rowset_metas); @@ -170,9 +162,9 @@ public: // Given spec_version, find a continuous version path and store it in version_path. // If quiet is true, then only "does this path exist" is returned. // If skip_missing_version is true, return ok even there are missing versions. - Status capture_consistent_versions_unlocked(const Version& spec_version, - std::vector<Version>* version_path, + Status capture_consistent_versions_unlocked(const Version& spec_version, Versions* version_path, bool skip_missing_version, bool quiet) const; + // if quiet is true, no error log will be printed if there are missing versions Status check_version_integrity(const Version& version, bool quiet = false); bool check_version_exist(const Version& version) const; @@ -204,11 +196,6 @@ public: CompactionType compaction_type, std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy); - // operation for clone - void calc_missed_versions(int64_t spec_version, std::vector<Version>* missed_versions); - void calc_missed_versions_unlocked(int64_t spec_version, - std::vector<Version>* missed_versions) const; - // This function to find max continuous version from the beginning. // For example: If there are 1, 2, 3, 5, 6, 7 versions belongs tablet, then 3 is target. // 3 will be saved in "version", and 7 will be saved in "max_version", if max_version != nullptr @@ -483,7 +470,6 @@ public: RowsetSharedPtr dst_rowset, const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>& location_map); - Status all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const; void sort_block(vectorized::Block& in_block, vectorized::Block& output_block); bool check_all_rowset_segment(); @@ -538,7 +524,7 @@ public: int64_t get_table_id() { return _tablet_meta->table_id(); } - // binlog releated functions + // binlog related functions bool is_enable_binlog(); bool is_binlog_enabled() { return _tablet_meta->binlog_config().is_enable(); } int64_t binlog_ttl_ms() const { return _tablet_meta->binlog_config().ttl_seconds(); } @@ -560,7 +546,6 @@ public: private: Status _init_once_action(); - void _print_missed_versions(const std::vector<Version>& missed_versions) const; bool _contains_rowset(const RowsetId rowset_id); Status _contains_version(const Version& version); @@ -758,7 +743,7 @@ inline Version Tablet::max_version() const { inline uint64_t Tablet::segment_count() const { std::shared_lock rdlock(_meta_lock); uint64_t segment_nums = 0; - for (auto& rs_meta : _tablet_meta->all_rs_metas()) { + for (const auto& rs_meta : _tablet_meta->all_rs_metas()) { segment_nums += rs_meta->num_segments(); } return segment_nums; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 19cfc6fa964..df9a45b03c8 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -146,8 +146,8 @@ Status TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const TabletShar int32_t old_version, new_version; { std::shared_lock rdlock(existed_tablet->get_header_lock()); - const RowsetSharedPtr old_rowset = existed_tablet->rowset_with_max_version(); - const RowsetSharedPtr new_rowset = tablet->rowset_with_max_version(); + const RowsetSharedPtr old_rowset = existed_tablet->get_rowset_with_max_version(); + const RowsetSharedPtr new_rowset = tablet->get_rowset_with_max_version(); // If new tablet is empty, it is a newly created schema change tablet. // the old tablet is dropped before add tablet. it should not exist old tablet if (new_rowset == nullptr) { diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 64bf7d4c198..2d131afa870 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -295,6 +295,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id TabletMeta::TabletMeta(const TabletMeta& b) : _table_id(b._table_id), + _index_id(b._index_id), _partition_id(b._partition_id), _tablet_id(b._tablet_id), _replica_id(b._replica_id), @@ -500,6 +501,7 @@ Status TabletMeta::deserialize(const string& meta_binary) { void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { _table_id = tablet_meta_pb.table_id(); + _index_id = tablet_meta_pb.index_id(); _partition_id = tablet_meta_pb.partition_id(); _tablet_id = tablet_meta_pb.tablet_id(); _replica_id = tablet_meta_pb.replica_id(); @@ -614,6 +616,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { tablet_meta_pb->set_table_id(table_id()); + tablet_meta_pb->set_index_id(index_id()); tablet_meta_pb->set_partition_id(partition_id()); tablet_meta_pb->set_tablet_id(tablet_id()); tablet_meta_pb->set_replica_id(replica_id()); @@ -857,6 +860,7 @@ Status TabletMeta::set_partition_id(int64_t partition_id) { bool operator==(const TabletMeta& a, const TabletMeta& b) { if (a._table_id != b._table_id) return false; + if (a._index_id != b._index_id) return false; if (a._partition_id != b._partition_id) return false; if (a._tablet_id != b._tablet_id) return false; if (a._replica_id != b._replica_id) return false; diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 094bb21507d..d5d1322d341 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -141,6 +141,7 @@ public: TabletUid tablet_uid() const; void set_tablet_uid(TabletUid uid) { _tablet_uid = uid; } int64_t table_id() const; + int64_t index_id() const; int64_t partition_id() const; int64_t tablet_id() const; int64_t replica_id() const; @@ -272,6 +273,7 @@ private: private: int64_t _table_id = 0; + int64_t _index_id = 0; int64_t _partition_id = 0; int64_t _tablet_id = 0; int64_t _replica_id = 0; @@ -526,6 +528,10 @@ inline int64_t TabletMeta::table_id() const { return _table_id; } +inline int64_t TabletMeta::index_id() const { + return _index_id; +} + inline int64_t TabletMeta::partition_id() const { return _partition_id; } diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index d8c7a54cb74..8d1d9c966ba 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -188,7 +188,7 @@ Status EngineCloneTask::_do_clone() { } bool is_new_tablet = tablet == nullptr; // try to incremental clone - std::vector<Version> missed_versions; + Versions missed_versions; // try to repair a tablet with missing version if (tablet != nullptr) { std::shared_lock migration_rlock(tablet->get_migration_lock(), std::try_to_lock); @@ -218,7 +218,7 @@ Status EngineCloneTask::_do_clone() { } } - tablet->calc_missed_versions(specified_version, &missed_versions); + missed_versions = tablet->get_missed_versions(specified_version); // if missed version size is 0, then it is useless to clone from remote be, it means local data is // completed. Or remote be will just return header not the rowset files. clone will failed. @@ -740,8 +740,7 @@ Status EngineCloneTask::_finish_incremental_clone(Tablet* tablet, /// Get missing versions again from local tablet. /// We got it before outside the lock, so it has to be got again. - std::vector<Version> missed_versions; - tablet->calc_missed_versions_unlocked(version, &missed_versions); + Versions missed_versions = tablet->get_missed_versions_unlocked(version); VLOG_NOTICE << "get missed versions again when finish incremental clone. " << "tablet=" << tablet->tablet_id() << ", clone version=" << version << ", missed_versions_size=" << missed_versions.size(); diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index efa85406c69..37857efe4df 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -69,7 +69,7 @@ Status EngineStorageMigrationTask::_get_versions(int32_t start_version, int32_t* return Status::NotSupported( "currently not support migrate tablet with cooldowned remote data"); } - const RowsetSharedPtr last_version = _tablet->rowset_with_max_version(); + const RowsetSharedPtr last_version = _tablet->get_rowset_with_max_version(); if (last_version == nullptr) { return Status::InternalError("failed to get rowset with max version, tablet={}", _tablet->tablet_id()); diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index c48ebf98a48..4e6553975fa 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -658,8 +658,8 @@ TEST_F(TestDeltaWriter, vec_write) { std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl; OlapMeta* meta = tablet->data_dir()->get_meta(); Version version; - version.first = tablet->rowset_with_max_version()->end_version() + 1; - version.second = tablet->rowset_with_max_version()->end_version() + 1; + version.first = tablet->get_rowset_with_max_version()->end_version() + 1; + version.second = tablet->get_rowset_with_max_version()->end_version() + 1; std::cout << "start to add rowset version:" << version.first << "-" << version.second << std::endl; std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; @@ -751,8 +751,8 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl; OlapMeta* meta = tablet->data_dir()->get_meta(); Version version; - version.first = tablet->rowset_with_max_version()->end_version() + 1; - version.second = tablet->rowset_with_max_version()->end_version() + 1; + version.first = tablet->get_rowset_with_max_version()->end_version() + 1; + version.second = tablet->get_rowset_with_max_version()->end_version() + 1; std::cout << "start to add rowset version:" << version.first << "-" << version.second << std::endl; std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; @@ -899,8 +899,8 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { // publish version on delta writer 1 success { Version version; - version.first = tablet->rowset_with_max_version()->end_version() + 1; - version.second = tablet->rowset_with_max_version()->end_version() + 1; + version.first = tablet->get_rowset_with_max_version()->end_version() + 1; + version.second = tablet->get_rowset_with_max_version()->end_version() + 1; std::cout << "start to add rowset version:" << version.first << "-" << version.second << std::endl; std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; @@ -950,8 +950,8 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { ASSERT_TRUE(res.ok()); Version version; - version.first = tablet->rowset_with_max_version()->end_version() + 1; - version.second = tablet->rowset_with_max_version()->end_version() + 1; + version.first = tablet->get_rowset_with_max_version()->end_version() + 1; + version.second = tablet->get_rowset_with_max_version()->end_version() + 1; std::cout << "start to add rowset version:" << version.first << "-" << version.second << std::endl; std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; @@ -982,7 +982,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { ASSERT_EQ(1, segments.size()); } - auto cur_version = tablet->rowset_with_max_version()->end_version(); + auto cur_version = tablet->get_rowset_with_max_version()->end_version(); // read data from rowset 1, verify the data correct { OlapReaderStatistics stats; diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index c65340522c6..46b4f18c28e 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -218,8 +218,8 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { TabletSharedPtr tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id); OlapMeta* meta = tablet->data_dir()->get_meta(); Version version; - version.first = tablet->rowset_with_max_version()->end_version() + 1; - version.second = tablet->rowset_with_max_version()->end_version() + 1; + version.first = tablet->get_rowset_with_max_version()->end_version() + 1; + version.second = tablet->get_rowset_with_max_version()->end_version() + 1; std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; StorageEngine::instance()->txn_manager()->get_txn_related_tablets( write_req.txn_id, write_req.partition_id, &tablet_related_rs); diff --git a/be/test/olap/remote_rowset_gc_test.cpp b/be/test/olap/remote_rowset_gc_test.cpp index fe8ab3b0f10..19b02dca9c4 100644 --- a/be/test/olap/remote_rowset_gc_test.cpp +++ b/be/test/olap/remote_rowset_gc_test.cpp @@ -211,8 +211,8 @@ TEST_F(RemoteRowsetGcTest, normal) { k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); OlapMeta* meta = tablet->data_dir()->get_meta(); Version version; - version.first = tablet->rowset_with_max_version()->end_version() + 1; - version.second = tablet->rowset_with_max_version()->end_version() + 1; + version.first = tablet->get_rowset_with_max_version()->end_version() + 1; + version.second = tablet->get_rowset_with_max_version()->end_version() + 1; std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; StorageEngine::instance()->txn_manager()->get_txn_related_tablets( write_req.txn_id, write_req.partition_id, &tablet_related_rs); diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index d83ea3eb016..5f307611abe 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -384,8 +384,8 @@ void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_ha *tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); OlapMeta* meta = (*tablet)->data_dir()->get_meta(); Version version; - version.first = (*tablet)->rowset_with_max_version()->end_version() + 1; - version.second = (*tablet)->rowset_with_max_version()->end_version() + 1; + version.first = (*tablet)->get_rowset_with_max_version()->end_version() + 1; + version.second = (*tablet)->get_rowset_with_max_version()->end_version() + 1; std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; k_engine->txn_manager()->get_txn_related_tablets(write_req.txn_id, write_req.partition_id, &tablet_related_rs); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org