dataroaring commented on code in PR #53540:
URL: https://github.com/apache/doris/pull/53540#discussion_r2344042554
##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -165,6 +200,116 @@ Status CloudTablet::capture_rs_readers(const Version&
spec_version,
return capture_rs_readers_unlocked(version_path, rs_splits);
}
+Status CloudTablet::capture_rs_readers_prefer_cache(const Version&
spec_version,
+ std::vector<RowSetSplits>*
rs_splits) {
+ g_capture_prefer_cache_count << 1;
+ Versions version_path;
+ std::shared_lock rlock(_meta_lock);
+
RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_prefer_cache(
+ spec_version, version_path,
+ [&](int64_t start, int64_t end) { return
rowset_is_warmed_up(start, end); }));
+ int64_t path_max_version = version_path.back().second;
+ VLOG_DEBUG << fmt::format(
+ "[verbose] CloudTablet::capture_rs_readers_prefer_cache, capture
path: {}, "
+ "tablet_id={}, spec_version={}, path_max_version={}",
+ fmt::join(version_path | std::views::transform([](const auto&
version) {
+ return fmt::format("{}", version.to_string());
+ }),
+ ", "),
+ tablet_id(), spec_version.to_string(), path_max_version);
+ return capture_rs_readers_unlocked(version_path, rs_splits);
+}
+
+bool CloudTablet::rowset_is_warmed_up(int64_t start_version, int64_t
end_version) {
+ if (start_version > end_version) {
+ return false;
+ }
+ Version version {start_version, end_version};
+ auto it = _rs_version_map.find(version);
+ if (it == _rs_version_map.end()) {
+ it = _stale_rs_version_map.find(version);
+ if (it == _stale_rs_version_map.end()) {
+ LOG_WARNING(
+ "fail to find Rowset in rs_version or stale_rs_version for
version. "
+ "tablet={}, version={}",
+ tablet_id(), version.to_string());
+ return false;
+ }
+ }
+ const auto& rs = it->second;
+ if (rs->visible_timestamp() < _engine.startup_timepoint()) {
+ // We only care about rowsets that are created after startup time
point. For other rowsets,
+ // we assume they are warmed up.
+ return true;
+ }
+ return is_rowset_warmed_up(rs->rowset_id());
+};
+
+Status CloudTablet::capture_rs_readers_with_freshness_tolerance(
+ const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
+ int64_t query_freshness_tolerance_ms) {
+ g_capture_with_freshness_tolerance_count << 1;
+ using namespace std::chrono;
+ auto freshness_limit_tp = system_clock::now() -
milliseconds(query_freshness_tolerance_ms);
+ // find a version path where every edge(rowset) has been warmuped
+ Versions version_path;
+ std::shared_lock rlock(_meta_lock);
+ if (enable_unique_key_merge_on_write()) {
+ // For merge-on-write table, newly generated delete bitmap marks will
be on the rowsets which are in newest layout.
+ // So we can ony capture rowsets which are in newest data layout.
Otherwise there may be data correctness issue.
+
RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_with_validator_mow(
+ spec_version, version_path,
+ [&](int64_t start, int64_t end) { return
rowset_is_warmed_up(start, end); }));
+ } else {
+
RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_with_validator(
+ spec_version, version_path,
+ [&](int64_t start, int64_t end) { return
rowset_is_warmed_up(start, end); }));
+ }
+ int64_t path_max_version = version_path.back().second;
+ auto should_be_visible_but_not_warmed_up = [&](const auto& rs_meta) ->
bool {
+ if (rs_meta->version() == Version {0, 1}) {
+ // skip rowset[0-1]
+ return false;
+ }
+ bool ret = rs_meta->start_version() > path_max_version &&
+ rs_meta->visible_timestamp() < freshness_limit_tp;
+ if (ret && config::read_cluster_cache_opt_verbose_log) {
+ std::time_t t1 =
system_clock::to_time_t(rs_meta->visible_timestamp());
+ std::tm tm1 = *std::localtime(&t1);
+ std::ostringstream oss1;
+ oss1 << std::put_time(&tm1, "%Y-%m-%d %H:%M:%S");
+
+ std::time_t t2 = system_clock::to_time_t(freshness_limit_tp);
+ std::tm tm2 = *std::localtime(&t2);
+ std::ostringstream oss2;
+ oss2 << std::put_time(&tm2, "%Y-%m-%d %H:%M:%S");
+ LOG_INFO(
+ "[verbose]
CloudTablet::capture_rs_readers_with_freshness_tolerance, "
+ "find a rowset which should be visible but not warmed up,
tablet_id={}, "
+ "path_max_version={}, rowset_id={}, version={},
visible_time={}, "
+ "freshness_limit={}, version_graph={},
rowset_warmup_digest={}",
+ tablet_id(), path_max_version,
rs_meta->rowset_id().to_string(),
+ rs_meta->version().to_string(), oss1.str(), oss2.str(),
+ _timestamped_version_tracker.debug_string(),
rowset_warmup_digest());
+ }
+ return ret;
+ };
+ // use std::views::concat after C++26
+ bool should_fallback = std::ranges::any_of(_tablet_meta->all_rs_metas(),
+
should_be_visible_but_not_warmed_up) ||
+
std::ranges::any_of(_tablet_meta->all_stale_rs_metas(),
+
should_be_visible_but_not_warmed_up);
Review Comment:
We'd better not traverse all metas each time, can we skip most? because we
just case new rowsets only.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]