github-actions[bot] commented on code in PR #30128: URL: https://github.com/apache/doris/pull/30128#discussion_r1458440818
########## be/src/cloud/cloud_meta_mgr.cpp: ########## @@ -271,28 +274,342 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tab } Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data) { - return Status::NotSupported("CloudMetaMgr::sync_tablet_rowsets is not implemented"); + using namespace std::chrono; + + TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet); + + std::shared_ptr<MetaService_Stub> stub; + RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub)); + + int tried = 0; + while (true) { + brpc::Controller cntl; + cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); + GetRowsetRequest req; + GetRowsetResponse resp; + + int64_t tablet_id = tablet->tablet_id(); + int64_t table_id = tablet->table_id(); + int64_t index_id = tablet->index_id(); + req.set_cloud_unique_id(config::cloud_unique_id); + auto* idx = req.mutable_idx(); + idx->set_tablet_id(tablet_id); + idx->set_table_id(table_id); + idx->set_index_id(index_id); + idx->set_partition_id(tablet->partition_id()); + { + std::shared_lock rlock(tablet->get_header_lock()); + req.set_start_version(tablet->local_max_version() + 1); + req.set_base_compaction_cnt(tablet->base_compaction_cnt()); + req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt()); + req.set_cumulative_point(tablet->cumulative_layer_point()); + } + req.set_end_version(-1); + VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString(); + + stub->get_rowset(&cntl, &req, &resp, nullptr); + int64_t latency = cntl.latency_us(); + g_get_rowset_latency << latency; + int retry_times = config::meta_service_rpc_retry_times; + if (cntl.Failed()) { + if (tried++ < retry_times) { + auto rng = make_random_engine(); + std::uniform_int_distribution<uint32_t> u(20, 200); + std::uniform_int_distribution<uint32_t> u1(500, 1000); + uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng); + std::this_thread::sleep_for(milliseconds(duration_ms)); + LOG_INFO("failed to get rowset meta") + .tag("reason", cntl.ErrorText()) + .tag("tablet_id", tablet_id) + .tag("table_id", table_id) + .tag("index_id", index_id) + .tag("partition_id", tablet->partition_id()) + .tag("tried", tried) + .tag("sleep", duration_ms); + continue; + } + return Status::RpcError("failed to get rowset meta: {}", cntl.ErrorText()); + } + if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { + return Status::NotFound("failed to get rowset meta: {}", resp.status().msg()); + } + if (resp.status().code() != MetaServiceCode::OK) { + return Status::InternalError("failed to get rowset meta: {}", resp.status().msg()); + } + if (latency > 100 * 1000) { // 100ms + LOG(INFO) << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size() + << ", latency=" << latency << "us"; + } else { + LOG_EVERY_N(INFO, 100) + << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size() + << ", latency=" << latency << "us"; + } + + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + tablet->last_sync_time_s = now; + + if (tablet->enable_unique_key_merge_on_write()) { + DeleteBitmap delete_bitmap(tablet_id); + int64_t old_max_version = req.start_version() - 1; + auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), + resp.stats(), req.idx(), &delete_bitmap); + if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) { + LOG_WARNING("rowset meta is expired, need to retry") + .tag("tablet", tablet->tablet_id()) + .tag("tried", tried) + .error(st); + continue; + } + if (!st.ok()) { + LOG_WARNING("failed to get delete bimtap") + .tag("tablet", tablet->tablet_id()) + .error(st); + return st; + } + tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap); + } + { + const auto& stats = resp.stats(); + std::unique_lock wlock(tablet->get_header_lock()); + + // ATTN: we are facing following data race + // + // resp_base_compaction_cnt=0|base_compaction_cnt=0|resp_cumulative_compaction_cnt=0|cumulative_compaction_cnt=1|resp_max_version=11|max_version=8 + // + // BE-compaction-thread meta-service BE-query-thread + // | | | + // local | commit cumu-compaction | | + // cc_cnt=0 | ---------------------------> | sync rowset (long rpc, local cc_cnt=0 ) | local + // | | <----------------------------------------- | cc_cnt=0 + // | | -. | + // local | done cc_cnt=1 | \ | + // cc_cnt=1 | <--------------------------- | \ | + // | | \ returned with resp cc_cnt=0 (snapshot) | + // | | '------------------------------------> | local + // | | | cc_cnt=1 + // | | | + // | | | CHECK FAIL + // | | | need retry + // To get rid of just retry syncing tablet + if (stats.base_compaction_cnt() < tablet->base_compaction_cnt() || + stats.cumulative_compaction_cnt() < tablet->cumulative_compaction_cnt()) + [[unlikely]] { + // stale request, ignore + LOG_WARNING("stale get rowset meta request") + .tag("resp_base_compaction_cnt", stats.base_compaction_cnt()) + .tag("base_compaction_cnt", tablet->base_compaction_cnt()) + .tag("resp_cumulative_compaction_cnt", stats.cumulative_compaction_cnt()) + .tag("cumulative_compaction_cnt", tablet->cumulative_compaction_cnt()) + .tag("tried", tried); + if (tried++ < 10) continue; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (tried++ < 10) { continue; } ``` ########## be/src/olap/base_tablet.cpp: ########## @@ -119,4 +118,149 @@ bool BaseTablet::_reconstruct_version_tracker_if_necessary() { return false; } -} /* namespace doris */ +// snapshot manager may call this api to check if version exists, so that +// the version maybe not exist +RowsetSharedPtr BaseTablet::get_rowset_by_version(const Version& version, + bool find_in_stale) const { + auto iter = _rs_version_map.find(version); + if (iter == _rs_version_map.end()) { + if (find_in_stale) { + return get_stale_rowset_by_version(version); + } + return nullptr; + } + return iter->second; +} + +RowsetSharedPtr BaseTablet::get_stale_rowset_by_version(const Version& version) const { + auto iter = _stale_rs_version_map.find(version); + if (iter == _stale_rs_version_map.end()) { + VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " << tablet_id(); + return nullptr; + } + return iter->second; +} + +// Already under _meta_lock +RowsetSharedPtr BaseTablet::get_rowset_with_max_version() const { Review Comment: warning: method 'get_rowset_with_max_version' can be made static [readability-convert-member-functions-to-static] be/src/olap/base_tablet.h:95: ```diff - RowsetSharedPtr get_rowset_with_max_version() const; + static RowsetSharedPtr get_rowset_with_max_version() ; ``` ```suggestion RowsetSharedPtr BaseTablet::get_rowset_with_max_version() { ``` ########## be/src/olap/base_tablet.cpp: ########## @@ -119,4 +118,149 @@ return false; } -} /* namespace doris */ +// snapshot manager may call this api to check if version exists, so that +// the version maybe not exist +RowsetSharedPtr BaseTablet::get_rowset_by_version(const Version& version, + bool find_in_stale) const { + auto iter = _rs_version_map.find(version); + if (iter == _rs_version_map.end()) { + if (find_in_stale) { + return get_stale_rowset_by_version(version); + } + return nullptr; + } + return iter->second; +} + +RowsetSharedPtr BaseTablet::get_stale_rowset_by_version(const Version& version) const { + auto iter = _stale_rs_version_map.find(version); + if (iter == _stale_rs_version_map.end()) { + VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " << tablet_id(); + return nullptr; + } + return iter->second; +} + +// Already under _meta_lock +RowsetSharedPtr BaseTablet::get_rowset_with_max_version() const { + Version max_version = _tablet_meta->max_version(); + if (max_version.first == -1) { + return nullptr; + } + + auto iter = _rs_version_map.find(max_version); + if (iter == _rs_version_map.end()) { + DCHECK(false) << "invalid version:" << max_version; + return nullptr; + } + return iter->second; +} + +Status BaseTablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const { + // Ensure that the obtained versions of rowsets are continuous + std::vector<Version> version_path; + RETURN_IF_ERROR(capture_consistent_versions_unlocked(Version(0, max_version), &version_path, + false, false)); + for (auto& ver : version_path) { + if (ver.second == 1) { + // [0-1] rowset is empty for each tablet, skip it + continue; + } + auto it = _rs_version_map.find(ver); + if (it == _rs_version_map.end()) { + return Status::Error<CAPTURE_ROWSET_ERROR, false>( + "fail to find Rowset for version. tablet={}, version={}", tablet_id(), + ver.to_string()); + } + rowset_ids->emplace(it->second->rowset_id()); + } + return Status::OK(); +} + +Status BaseTablet::capture_consistent_versions_unlocked(const Version& spec_version, + std::vector<Version>* version_path, + bool skip_missing_version, + bool quiet) const { + Status status = + _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path); + if (!status.ok() && !quiet) { + std::vector<Version> missed_versions; + calc_missed_versions_unlocked(spec_version.second, &missed_versions); + if (missed_versions.empty()) { + // if version_path is null, it may be a compaction check logic. + // so to avoid print too many logs. + if (version_path != nullptr) { + LOG(WARNING) << "tablet:" << tablet_id() + << ", version already has been merged. spec_version: " << spec_version + << ", max_version: " << max_version_unlocked(); + } + status = Status::Error<VERSION_ALREADY_MERGED>( + "missed_versions is empty, spec_version " + "{}, max_version {}, tablet_id {}", + spec_version.second, max_version_unlocked().second, tablet_id()); + } else { + if (version_path != nullptr) { + LOG(WARNING) << "status:" << status << ", tablet:" << tablet_id() + << ", missed version for version:" << spec_version; + _print_missed_versions(missed_versions); + if (skip_missing_version) { + LOG(WARNING) << "force skipping missing version for tablet:" << tablet_id(); + return Status::OK(); + } + } + } + } + return status; +} + +void BaseTablet::_print_missed_versions(const std::vector<Version>& missed_versions) const { + std::stringstream ss; + ss << tablet_id() << " has " << missed_versions.size() << " missed version:"; + // print at most 10 version + for (int i = 0; i < 10 && i < missed_versions.size(); ++i) { + ss << missed_versions[i] << ","; + } + LOG(WARNING) << ss.str(); +} + +void BaseTablet::calc_missed_versions(int64_t spec_version, std::vector<Version>* missed_versions) { Review Comment: warning: method 'calc_missed_versions' can be made static [readability-convert-member-functions-to-static] ```suggestion static void BaseTablet::calc_missed_versions(int64_t spec_version, std::vector<Version>* missed_versions) { ``` ########## be/src/olap/base_tablet.h: ########## @@ -117,4 +145,34 @@ std::atomic<int64_t> published_count = 0; }; +// TODO(lingbin): Why other methods which need to get information from _tablet_meta +// are not locked, here needs a comment to explain. +inline size_t BaseTablet::num_rows() { + std::shared_lock rdlock(_meta_lock); + return _tablet_meta->num_rows(); +} + +inline int BaseTablet::version_count() const { Review Comment: warning: method 'version_count' can be made static [readability-convert-member-functions-to-static] be/src/olap/base_tablet.h:112: ```diff - int version_count() const; + static int version_count() ; ``` ```suggestion inline int BaseTablet::version_count() { ``` ########## be/src/olap/base_tablet.cpp: ########## @@ -119,4 +118,149 @@ return false; } -} /* namespace doris */ +// snapshot manager may call this api to check if version exists, so that +// the version maybe not exist +RowsetSharedPtr BaseTablet::get_rowset_by_version(const Version& version, + bool find_in_stale) const { + auto iter = _rs_version_map.find(version); + if (iter == _rs_version_map.end()) { + if (find_in_stale) { + return get_stale_rowset_by_version(version); + } + return nullptr; + } + return iter->second; +} + +RowsetSharedPtr BaseTablet::get_stale_rowset_by_version(const Version& version) const { + auto iter = _stale_rs_version_map.find(version); + if (iter == _stale_rs_version_map.end()) { + VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " << tablet_id(); + return nullptr; + } + return iter->second; +} + +// Already under _meta_lock +RowsetSharedPtr BaseTablet::get_rowset_with_max_version() const { + Version max_version = _tablet_meta->max_version(); + if (max_version.first == -1) { + return nullptr; + } + + auto iter = _rs_version_map.find(max_version); + if (iter == _rs_version_map.end()) { + DCHECK(false) << "invalid version:" << max_version; + return nullptr; + } + return iter->second; +} + +Status BaseTablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const { Review Comment: warning: method 'all_rs_id' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status BaseTablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) { ``` ########## be/src/olap/base_tablet.h: ########## @@ -117,4 +145,34 @@ class BaseTablet { std::atomic<int64_t> published_count = 0; }; +// TODO(lingbin): Why other methods which need to get information from _tablet_meta +// are not locked, here needs a comment to explain. +inline size_t BaseTablet::num_rows() { Review Comment: warning: method 'num_rows' can be made static [readability-convert-member-functions-to-static] be/src/olap/base_tablet.h:111: ```diff - size_t num_rows(); + static size_t num_rows(); ``` ########## be/src/olap/base_tablet.h: ########## @@ -117,4 +145,34 @@ std::atomic<int64_t> published_count = 0; }; +// TODO(lingbin): Why other methods which need to get information from _tablet_meta +// are not locked, here needs a comment to explain. +inline size_t BaseTablet::num_rows() { + std::shared_lock rdlock(_meta_lock); + return _tablet_meta->num_rows(); +} + +inline int BaseTablet::version_count() const { + std::shared_lock rdlock(_meta_lock); + return _tablet_meta->version_count(); +} + +inline Version BaseTablet::max_version() const { + std::shared_lock rdlock(_meta_lock); + return _tablet_meta->max_version(); +} + +inline uint64_t BaseTablet::segment_count() const { Review Comment: warning: method 'segment_count' can be made static [readability-convert-member-functions-to-static] be/src/olap/base_tablet.h:113: ```diff - uint64_t segment_count() const; + static uint64_t segment_count() ; ``` ```suggestion inline uint64_t BaseTablet::segment_count() { ``` ########## be/src/olap/tablet_meta.cpp: ########## @@ -850,6 +853,7 @@ Status TabletMeta::set_partition_id(int64_t partition_id) { bool operator==(const TabletMeta& a, const TabletMeta& b) { if (a._table_id != b._table_id) return false; + if (a._index_id != b._index_id) return false; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (a._index_id != b._index_id) { return false; } ``` ########## be/src/olap/rowset/rowset_meta.cpp: ########## @@ -102,18 +102,20 @@ void RowsetMeta::set_fs(io::FileSystemSPtr fs) { _fs = std::move(fs); } -void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) const { +void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema) const { Review Comment: warning: method 'to_rowset_pb' can be made static [readability-convert-member-functions-to-static] ```suggestion static void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema) { ``` ########## be/src/olap/base_tablet.h: ########## @@ -117,4 +145,34 @@ std::atomic<int64_t> published_count = 0; }; +// TODO(lingbin): Why other methods which need to get information from _tablet_meta +// are not locked, here needs a comment to explain. +inline size_t BaseTablet::num_rows() { + std::shared_lock rdlock(_meta_lock); + return _tablet_meta->num_rows(); +} + +inline int BaseTablet::version_count() const { + std::shared_lock rdlock(_meta_lock); + return _tablet_meta->version_count(); +} + +inline Version BaseTablet::max_version() const { Review Comment: warning: method 'max_version' can be made static [readability-convert-member-functions-to-static] be/src/olap/base_tablet.h:114: ```diff - Version max_version() const; + static Version max_version() ; ``` ```suggestion inline Version BaseTablet::max_version() { ``` ########## be/src/olap/base_tablet.cpp: ########## @@ -119,4 +118,149 @@ return false; } -} /* namespace doris */ +// snapshot manager may call this api to check if version exists, so that +// the version maybe not exist +RowsetSharedPtr BaseTablet::get_rowset_by_version(const Version& version, + bool find_in_stale) const { + auto iter = _rs_version_map.find(version); + if (iter == _rs_version_map.end()) { + if (find_in_stale) { + return get_stale_rowset_by_version(version); + } + return nullptr; + } + return iter->second; +} + +RowsetSharedPtr BaseTablet::get_stale_rowset_by_version(const Version& version) const { + auto iter = _stale_rs_version_map.find(version); + if (iter == _stale_rs_version_map.end()) { + VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " << tablet_id(); + return nullptr; + } + return iter->second; +} + +// Already under _meta_lock +RowsetSharedPtr BaseTablet::get_rowset_with_max_version() const { + Version max_version = _tablet_meta->max_version(); + if (max_version.first == -1) { + return nullptr; + } + + auto iter = _rs_version_map.find(max_version); + if (iter == _rs_version_map.end()) { + DCHECK(false) << "invalid version:" << max_version; + return nullptr; + } + return iter->second; +} + +Status BaseTablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const { + // Ensure that the obtained versions of rowsets are continuous + std::vector<Version> version_path; + RETURN_IF_ERROR(capture_consistent_versions_unlocked(Version(0, max_version), &version_path, + false, false)); + for (auto& ver : version_path) { + if (ver.second == 1) { + // [0-1] rowset is empty for each tablet, skip it + continue; + } + auto it = _rs_version_map.find(ver); + if (it == _rs_version_map.end()) { + return Status::Error<CAPTURE_ROWSET_ERROR, false>( + "fail to find Rowset for version. tablet={}, version={}", tablet_id(), + ver.to_string()); + } + rowset_ids->emplace(it->second->rowset_id()); + } + return Status::OK(); +} + +Status BaseTablet::capture_consistent_versions_unlocked(const Version& spec_version, + std::vector<Version>* version_path, + bool skip_missing_version, + bool quiet) const { + Status status = + _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path); + if (!status.ok() && !quiet) { + std::vector<Version> missed_versions; + calc_missed_versions_unlocked(spec_version.second, &missed_versions); + if (missed_versions.empty()) { + // if version_path is null, it may be a compaction check logic. + // so to avoid print too many logs. + if (version_path != nullptr) { + LOG(WARNING) << "tablet:" << tablet_id() + << ", version already has been merged. spec_version: " << spec_version + << ", max_version: " << max_version_unlocked(); + } + status = Status::Error<VERSION_ALREADY_MERGED>( + "missed_versions is empty, spec_version " + "{}, max_version {}, tablet_id {}", + spec_version.second, max_version_unlocked().second, tablet_id()); + } else { + if (version_path != nullptr) { + LOG(WARNING) << "status:" << status << ", tablet:" << tablet_id() + << ", missed version for version:" << spec_version; + _print_missed_versions(missed_versions); + if (skip_missing_version) { + LOG(WARNING) << "force skipping missing version for tablet:" << tablet_id(); + return Status::OK(); + } + } + } + } + return status; +} + +void BaseTablet::_print_missed_versions(const std::vector<Version>& missed_versions) const { + std::stringstream ss; + ss << tablet_id() << " has " << missed_versions.size() << " missed version:"; + // print at most 10 version + for (int i = 0; i < 10 && i < missed_versions.size(); ++i) { + ss << missed_versions[i] << ","; + } + LOG(WARNING) << ss.str(); +} + +void BaseTablet::calc_missed_versions(int64_t spec_version, std::vector<Version>* missed_versions) { + std::shared_lock rdlock(_meta_lock); + calc_missed_versions_unlocked(spec_version, missed_versions); +} + +// for example: +// [0-4][5-5][8-8][9-9] +// if spec_version = 6, we still return {7} other than {6, 7} +void BaseTablet::calc_missed_versions_unlocked(int64_t spec_version, + std::vector<Version>* missed_versions) const { Review Comment: warning: method 'calc_missed_versions_unlocked' can be made static [readability-convert-member-functions-to-static] ```suggestion static void BaseTablet::calc_missed_versions_unlocked(int64_t spec_version, std::vector<Version>* missed_versions) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org