This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 414eb2a8ebd [fix](cooldown) Fix bug in follow cooldowned data (#32801) 414eb2a8ebd is described below commit 414eb2a8ebdff514af5e5d7335ebccfd756ec90f Author: plat1ko <platonekos...@gmail.com> AuthorDate: Tue Mar 26 12:04:55 2024 +0800 [fix](cooldown) Fix bug in follow cooldowned data (#32801) --- be/src/olap/tablet.cpp | 44 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index f9b12cc671d..9b745a92479 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -565,7 +565,9 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add, // delete rowset in "to_delete" directly for (auto& rs : to_delete) { LOG(INFO) << "add unused rowset " << rs->rowset_id() << " because of same version"; - StorageEngine::instance()->add_unused_rowset(rs); + if (rs->is_local()) { + StorageEngine::instance()->add_unused_rowset(rs); + } } } return Status::OK(); @@ -604,7 +606,9 @@ void Tablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool } else { for (auto& rs : to_delete) { _timestamped_version_tracker.delete_version(rs->version()); - StorageEngine::instance()->add_unused_rowset(rs); + if (rs->is_local()) { + StorageEngine::instance()->add_unused_rowset(rs); + } } } } @@ -830,7 +834,9 @@ void Tablet::delete_expired_stale_rowset() { auto it = _stale_rs_version_map.find(timestampedVersion->version()); if (it != _stale_rs_version_map.end()) { // delete rowset - StorageEngine::instance()->add_unused_rowset(it->second); + if (it->second->is_local()) { + StorageEngine::instance()->add_unused_rowset(it->second); + } _stale_rs_version_map.erase(it); VLOG_NOTICE << "delete stale rowset tablet=" << full_name() << " version[" << timestampedVersion->version().first << "," @@ -2311,6 +2317,12 @@ Status Tablet::_follow_cooldowned_data() { std::vector<RowsetSharedPtr> overlap_rowsets; bool version_aligned = false; + // Holding these to delete rowsets' shared ptr until save meta can avoid trash sweeping thread + // deleting these rowsets' files before rowset meta has been removed from disk, which may cause + // data loss when BE reboot before save meta to disk. + std::vector<RowsetSharedPtr> to_delete; + std::vector<RowsetSharedPtr> to_add; + { std::lock_guard wlock(_meta_lock); SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); @@ -2336,17 +2348,18 @@ Status Tablet::_follow_cooldowned_data() { } } std::sort(overlap_rowsets.begin(), overlap_rowsets.end(), Rowset::comparator); + + // Find different rowset in `overlap_rowsets` and `cooldown_meta_pb.rs_metas` auto rs_pb_it = cooldown_meta_pb.rs_metas().begin(); auto rs_it = overlap_rowsets.begin(); for (; rs_pb_it != cooldown_meta_pb.rs_metas().end() && rs_it != overlap_rowsets.end(); ++rs_pb_it, ++rs_it) { - // skip cooldowned rowset with same version in BE - if ((*rs_it)->is_local() || rs_pb_it->end_version() != (*rs_it)->end_version()) { + if (rs_pb_it->rowset_id_v2() != (*rs_it)->rowset_id().to_string()) { break; } } - std::vector<RowsetSharedPtr> to_delete(rs_it, overlap_rowsets.end()); - std::vector<RowsetSharedPtr> to_add; + + to_delete.assign(rs_it, overlap_rowsets.end()); to_add.reserve(cooldown_meta_pb.rs_metas().end() - rs_pb_it); for (; rs_pb_it != cooldown_meta_pb.rs_metas().end(); ++rs_pb_it) { auto rs_meta = std::make_shared<RowsetMeta>(); @@ -2361,12 +2374,29 @@ Status Tablet::_follow_cooldowned_data() { // TODO(plat1ko): process primary key _tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id()); } + { std::lock_guard rlock(_meta_lock); SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); save_meta(); } + if (!to_add.empty()) { + LOG(INFO) << "modify rowsets when follow cooldowned data, tablet_id=" << tablet_id() + << [&]() { + std::stringstream ss; + ss << " delete rowsets:\n"; + for (auto&& rs : to_delete) { + ss << rs->version() << ' ' << rs->rowset_id() << '\n'; + } + ss << "add rowsets:\n"; + for (auto&& rs : to_add) { + ss << rs->version() << ' ' << rs->rowset_id() << '\n'; + } + return ss.str(); + }(); + } + return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org