This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cc5fa509ad [fix](cooldown) Fix bug in concurrent `update_cooldown_conf` and operations that update cooldowned data (#17086) cc5fa509ad is described below commit cc5fa509ad0059bc337dba84858e907782baa9d0 Author: plat1ko <platonekos...@gmail.com> AuthorDate: Fri Mar 3 14:36:58 2023 +0800 [fix](cooldown) Fix bug in concurrent `update_cooldown_conf` and operations that update cooldowned data (#17086) --- be/src/agent/task_worker_pool.cpp | 1 + be/src/olap/cold_data_compaction.cpp | 17 +-- be/src/olap/olap_define.h | 11 ++ be/src/olap/olap_server.cpp | 2 +- be/src/olap/rowset/beta_rowset.cpp | 10 -- be/src/olap/rowset/beta_rowset.h | 4 - be/src/olap/tablet.cpp | 155 ++++++++++++--------- be/src/olap/tablet.h | 44 ++++-- .../java/org/apache/doris/catalog/Replica.java | 9 ++ .../apache/doris/catalog/TabletInvertedIndex.java | 5 +- .../org/apache/doris/master/ReportHandler.java | 38 +++-- .../apache/doris/service/FrontendServiceImpl.java | 12 +- gensrc/thrift/MasterService.thrift | 2 +- 13 files changed, 192 insertions(+), 118 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 3ed931ddaf..4056e0d176 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1848,6 +1848,7 @@ void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() { } tablet->update_cooldown_conf(cooldown_conf.cooldown_term, cooldown_conf.cooldown_replica_id); + // TODO(AlexYue): if `update_cooldown_conf` success, async call `write_cooldown_meta` } } } diff --git a/be/src/olap/cold_data_compaction.cpp b/be/src/olap/cold_data_compaction.cpp index 65b2330dc7..9fbeef8245 100644 --- a/be/src/olap/cold_data_compaction.cpp +++ b/be/src/olap/cold_data_compaction.cpp @@ -45,6 +45,10 @@ Status ColdDataCompaction::execute_compact_impl() { #endif SCOPED_ATTACH_TASK(_mem_tracker); int64_t permits = get_compaction_permits(); + std::shared_lock cooldown_conf_rlock(_tablet->get_cooldown_conf_lock()); + if (_tablet->cooldown_conf_unlocked().first != _tablet->replica_id()) { + return Status::Aborted("this replica is not cooldown replica"); + } RETURN_IF_ERROR(do_compaction(permits)); _state = CompactionState::SUCCESS; return Status::OK(); @@ -62,16 +66,6 @@ Status ColdDataCompaction::pick_rowsets_to_compact() { Status ColdDataCompaction::modify_rowsets(const Merger::Statistics* stats) { UniqueId cooldown_meta_id = UniqueId::gen_uid(); - - // write remote tablet meta - std::shared_ptr<io::RemoteFileSystem> fs; - RETURN_IF_ERROR(get_remote_file_system(_tablet->storage_policy_id(), &fs)); - std::vector<RowsetMetaSharedPtr> to_deletes; - for (auto& rs : _input_rowsets) { - to_deletes.emplace_back(rs->rowset_meta()); - } - RETURN_IF_ERROR(_tablet->write_cooldown_meta(fs, cooldown_meta_id, - _output_rowset->rowset_meta(), to_deletes)); { std::lock_guard wlock(_tablet->get_header_lock()); // Merged cooldowned rowsets MUST NOT be managed by version graph, they will be reclaimed by `remove_unused_remote_files`. @@ -85,6 +79,9 @@ Status ColdDataCompaction::modify_rowsets(const Merger::Statistics* stats) { std::shared_lock rlock(_tablet->get_header_lock()); _tablet->save_meta(); } + // write remote tablet meta + // TODO(AlexYue): async call `write_cooldown_meta` + RETURN_IF_ERROR(_tablet->write_cooldown_meta()); return Status::OK(); } diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 367d0d8ba7..fbc4f24557 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -98,6 +98,17 @@ static const std::string PENDING_DELTA_PREFIX = "pending_delta"; static const std::string INCREMENTAL_DELTA_PREFIX = "incremental_delta"; static const std::string CLONE_PREFIX = "clone"; +// define paths +static inline std::string remote_tablet_path(int64_t tablet_id) { + // data/{tablet_id} + return fmt::format("{}/{}", DATA_PREFIX, tablet_id); +} +static inline std::string remote_tablet_meta_path(int64_t tablet_id, int64_t replica_id, + int64_t cooldown_term) { + // data/{tablet_id}/{replica_id}.{cooldown_term}.meta + return fmt::format("{}/{}.{}.meta", remote_tablet_path(tablet_id), replica_id, cooldown_term); +} + static const std::string TABLET_UID = "tablet_uid"; static const std::string STORAGE_NAME = "storage_name"; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 36e67bc1f9..e621e893f8 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -797,7 +797,7 @@ void StorageEngine::_cold_data_compaction_producer_callback() { tablet_to_follow.reserve(n + 1); for (auto& t : tablets) { - if (t->replica_id() == t->cooldown_replica_id()) { + if (t->replica_id() == t->cooldown_conf_unlocked().first) { auto score = t->calc_cold_data_compaction_score(); if (score < 4) { continue; diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 26d4b0b7ce..4f459d8365 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -69,16 +69,6 @@ std::string BetaRowset::segment_file_path(const std::string& rowset_dir, const R return fmt::format("{}/{}_{}.dat", rowset_dir, rowset_id.to_string(), segment_id); } -std::string BetaRowset::remote_tablet_path(int64_t tablet_id) { - // data/{tablet_id} - return fmt::format("{}/{}", DATA_PREFIX, tablet_id); -} - -std::string BetaRowset::remote_tablet_meta_path(int64_t tablet_id, int64_t replica_id) { - // data/{tablet_id}/{replica_id}.meta - return fmt::format("{}/{}.meta", remote_tablet_path(tablet_id), replica_id); -} - std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id, int segment_id) { // data/{tablet_id}/{rowset_id}_{seg_num}.dat diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index c2a86f13c8..5401c60860 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -61,10 +61,6 @@ public: static std::string remote_segment_path(int64_t tablet_id, const std::string& rowset_id, int segment_id); - static std::string remote_tablet_path(int64_t tablet_id); - - static std::string remote_tablet_meta_path(int64_t tablet_id, int64_t replica_id); - Status remove() override; Status link_files_to(const std::string& dir, RowsetId new_rowset_id, diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 62d8ceae60..0563e3dd76 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1409,13 +1409,14 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info, tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory()); tablet_info->__set_replica_id(replica_id()); tablet_info->__set_remote_data_size(_tablet_meta->tablet_remote_size()); - if (tablet_state() == TABLET_RUNNING && _tablet_meta->storage_policy_id() > 0) { - tablet_info->__set_cooldown_replica_id(_cooldown_replica_id); + if (_tablet_meta->cooldown_meta_id().initialized()) { // has cooldowned data tablet_info->__set_cooldown_term(_cooldown_term); - } - if (_tablet_meta->cooldown_meta_id().initialized()) { tablet_info->__set_cooldown_meta_id(_tablet_meta->cooldown_meta_id().to_thrift()); } + if (tablet_state() == TABLET_RUNNING && _tablet_meta->storage_policy_id() > 0) { + // tablet may not have cooldowned data, but the storage policy is set + tablet_info->__set_cooldown_term(_cooldown_term); + } } // should use this method to get a copy of current tablet meta @@ -1633,7 +1634,7 @@ void Tablet::_init_context_common_fields(RowsetWriterContext& context) { context.rowset_type = StorageEngine::instance()->default_rowset_type(); } if (context.fs != nullptr && context.fs->type() != io::FileSystemType::LOCAL) { - context.rowset_dir = BetaRowset::remote_tablet_path(tablet_id()); + context.rowset_dir = remote_tablet_path(tablet_id()); } else { context.rowset_dir = tablet_path(); } @@ -1648,37 +1649,38 @@ Status Tablet::create_rowset(const RowsetMetaSharedPtr& rowset_meta, RowsetShare Status Tablet::cooldown() { std::unique_lock schema_change_lock(_schema_change_lock, std::try_to_lock); if (!schema_change_lock.owns_lock()) { - LOG(WARNING) << "Failed to own schema_change_lock. tablet=" << tablet_id(); - return Status::Error<TRY_LOCK_FAILED>(); + return Status::Error<TRY_LOCK_FAILED>("try schema_change_lock failed"); } // Check executing serially with compaction task. std::unique_lock base_compaction_lock(_base_compaction_lock, std::try_to_lock); if (!base_compaction_lock.owns_lock()) { - LOG(WARNING) << "Failed to own base_compaction_lock. tablet=" << tablet_id(); - return Status::Error<TRY_LOCK_FAILED>(); + return Status::Error<TRY_LOCK_FAILED>("try base_compaction_lock failed"); } std::unique_lock cumu_compaction_lock(_cumulative_compaction_lock, std::try_to_lock); if (!cumu_compaction_lock.owns_lock()) { - LOG(WARNING) << "Failed to own cumu_compaction_lock. tablet=" << tablet_id(); - return Status::Error<TRY_LOCK_FAILED>(); + return Status::Error<TRY_LOCK_FAILED>("try cumu_compaction_lock failed"); } - int64_t cooldown_replica_id = _cooldown_replica_id; - if (cooldown_replica_id <= 0) { // wait for FE to push cooldown conf + std::shared_lock cooldown_conf_rlock(_cooldown_conf_lock); + if (_cooldown_replica_id <= 0) { // wait for FE to push cooldown conf return Status::InternalError("invalid cooldown_replica_id"); } - std::shared_ptr<io::RemoteFileSystem> fs; - RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs)); - - if (cooldown_replica_id == replica_id()) { - RETURN_IF_ERROR(_cooldown_data(fs)); + if (_cooldown_replica_id == replica_id()) { + // this replica is cooldown replica + RETURN_IF_ERROR(_cooldown_data()); } else { - RETURN_IF_ERROR(_follow_cooldowned_data(fs, cooldown_replica_id)); + // try to follow cooldowned data from cooldown replica + RETURN_IF_ERROR(_follow_cooldowned_data()); } return Status::OK(); } -Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& dest_fs) { +// hold SHARED `cooldown_conf_lock` +Status Tablet::_cooldown_data() { + DCHECK(_cooldown_replica_id == replica_id()); + + std::shared_ptr<io::RemoteFileSystem> dest_fs; + RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &dest_fs)); auto old_rowset = pick_cooldown_rowset(); if (!old_rowset) { return Status::InternalError("cannot pick cooldown rowset in tablet {}", tablet_id()); @@ -1711,13 +1713,6 @@ Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& dest_ new_rowset_meta->set_fs(dest_fs); new_rowset_meta->set_creation_time(time(nullptr)); UniqueId cooldown_meta_id = UniqueId::gen_uid(); - - // upload cooldowned rowset meta to remote fs - st = write_cooldown_meta(dest_fs, cooldown_meta_id, new_rowset_meta, {}); - if (!st.ok()) { - return st; - } - RowsetSharedPtr new_rowset; RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta, &new_rowset); @@ -1735,13 +1730,17 @@ Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& dest_ std::unique_lock meta_rlock(_meta_lock); save_meta(); } + // upload cooldowned rowset meta to remote fs + // TODO(AlexYue): async call `write_cooldown_meta` + RETURN_IF_ERROR(write_cooldown_meta()); return Status::OK(); } +// hold SHARED `cooldown_conf_lock` Status Tablet::_read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs, - int64_t cooldown_replica_id, TabletMetaPB* tablet_meta_pb) { + TabletMetaPB* tablet_meta_pb) { std::string remote_meta_path = - BetaRowset::remote_tablet_meta_path(tablet_id(), cooldown_replica_id); + remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, _cooldown_term); IOContext io_ctx; io::FileReaderSPtr tablet_meta_reader; RETURN_IF_ERROR(fs->open_file(remote_meta_path, &tablet_meta_reader, &io_ctx)); @@ -1756,46 +1755,50 @@ Status Tablet::_read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& return Status::OK(); } -Status Tablet::write_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs, - UniqueId cooldown_meta_id, - const RowsetMetaSharedPtr& new_rs_meta, - const std::vector<RowsetMetaSharedPtr>& to_deletes) { - std::unordered_set<Version, HashOfVersion> to_delete_set; - for (auto& rs_meta : to_deletes) { - to_delete_set.emplace(rs_meta->version()); +// `rs_metas` MUST already be sorted by `RowsetMeta::comparator` +Status check_version_continuity(const std::vector<RowsetMetaSharedPtr>& rs_metas) { + if (rs_metas.size() < 2) { + return Status::OK(); + } + auto prev = rs_metas.begin(); + for (auto it = rs_metas.begin() + 1; it != rs_metas.end(); ++it) { + if ((*prev)->end_version() + 1 != (*it)->start_version()) { + return Status::InternalError("versions are not continuity: prev={} cur={}", + (*prev)->version().to_string(), + (*it)->version().to_string()); + } + prev = it; + } + return Status::OK(); +} + +Status Tablet::write_cooldown_meta() { + auto [cooldown_replica_id, cooldown_term] = cooldown_conf(); + if (cooldown_replica_id != replica_id()) { + return Status::Aborted("this replica is not cooldown replica"); } + std::shared_ptr<io::RemoteFileSystem> fs; + RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs)); + std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas; + UniqueId cooldown_meta_id; { std::shared_lock meta_rlock(_meta_lock); for (auto& rs_meta : _tablet_meta->all_rs_metas()) { if (!rs_meta->is_local()) { - if (to_delete_set.find(rs_meta->version()) != to_delete_set.end()) { - continue; - } - cooldowned_rs_metas.emplace_back(rs_meta); + cooldowned_rs_metas.push_back(rs_meta); } } + cooldown_meta_id = _tablet_meta->cooldown_meta_id(); } - cooldowned_rs_metas.emplace_back(new_rs_meta); - std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(), RowsetMeta::comparator); - - // check_version_continuity - if (!cooldowned_rs_metas.empty()) { - RowsetMetaSharedPtr prev_rowset_meta = cooldowned_rs_metas.front(); - for (size_t i = 1; i < cooldowned_rs_metas.size(); ++i) { - RowsetMetaSharedPtr rowset_meta = cooldowned_rs_metas[i]; - if (rowset_meta->start_version() != prev_rowset_meta->end_version() + 1) { - LOG(WARNING) << "There are missed versions among rowsets. " - << "prev_rowset_meta version=" << prev_rowset_meta->start_version() - << "-" << prev_rowset_meta->end_version() - << ", rowset_meta version=" << rowset_meta->start_version() << "-" - << rowset_meta->end_version(); - return Status::Error<CUMULATIVE_MISS_VERSION>(); - } - prev_rowset_meta = rowset_meta; - } + if (cooldowned_rs_metas.empty()) { + LOG(INFO) << "no cooldown meta to write, tablet_id=" << tablet_id(); + return Status::OK(); } + std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(), RowsetMeta::comparator); + DCHECK(cooldowned_rs_metas.front()->start_version() == 0); + RETURN_IF_ERROR(check_version_continuity(cooldowned_rs_metas)); TabletMetaPB tablet_meta_pb; auto rs_metas = tablet_meta_pb.mutable_rs_metas(); @@ -1807,7 +1810,7 @@ Status Tablet::write_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo); std::string remote_meta_path = - BetaRowset::remote_tablet_meta_path(tablet_id(), _tablet_meta->replica_id()); + remote_tablet_meta_path(tablet_id(), cooldown_replica_id, cooldown_term); io::FileWriterPtr tablet_meta_writer; RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer)); auto val = tablet_meta_pb.SerializeAsString(); @@ -1815,11 +1818,15 @@ Status Tablet::write_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& return tablet_meta_writer->close(); } -Status Tablet::_follow_cooldowned_data(const std::shared_ptr<io::RemoteFileSystem>& fs, - int64_t cooldown_replica_id) { +// hold SHARED `cooldown_conf_lock` +Status Tablet::_follow_cooldowned_data() { + DCHECK(_cooldown_replica_id != replica_id()); LOG(INFO) << "try to follow cooldowned data. tablet_id=" << tablet_id() - << " cooldown_replica_id=" << cooldown_replica_id + << " cooldown_replica_id=" << _cooldown_replica_id << " local replica=" << replica_id(); + + std::shared_ptr<io::RemoteFileSystem> fs; + RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs)); // MUST executing serially with cold data compaction, because compaction input rowsets may be deleted by this function std::unique_lock cold_compaction_lock(_cold_compaction_lock, std::try_to_lock); if (!cold_compaction_lock.owns_lock()) { @@ -1827,7 +1834,7 @@ Status Tablet::_follow_cooldowned_data(const std::shared_ptr<io::RemoteFileSyste } TabletMetaPB cooldown_meta_pb; - RETURN_IF_ERROR(_read_cooldown_meta(fs, cooldown_replica_id, &cooldown_meta_pb)); + RETURN_IF_ERROR(_read_cooldown_meta(fs, &cooldown_meta_pb)); DCHECK(cooldown_meta_pb.rs_metas_size() > 0); if (_tablet_meta->cooldown_meta_id() == cooldown_meta_pb.cooldown_meta_id()) { // cooldowned rowsets are same, no need to follow @@ -2047,11 +2054,18 @@ void Tablet::remove_unused_remote_files() { DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id); DCHECK(dest_fs->type() != io::FileSystemType::LOCAL); - Status st; + std::shared_ptr<io::RemoteFileSystem> fs; + auto st = get_remote_file_system(t->storage_policy_id(), &fs); + if (!st.ok()) { + LOG(WARNING) << "encounter error when remove unused remote files, tablet_id=" + << t->tablet_id() << " : " << st; + return; + } + std::vector<io::Path> files; // FIXME(plat1ko): What if user reset resource in storage policy to another resource? // Maybe we should also list files in previously uploaded resources. - st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()), &files); + st = dest_fs->list(remote_tablet_path(t->tablet_id()), &files); if (!st.ok()) { LOG(WARNING) << "encounter error when remove unused remote files, tablet_id=" << t->tablet_id() << " : " << st; @@ -2075,8 +2089,13 @@ void Tablet::remove_unused_remote_files() { } cooldown_meta_id = t->_tablet_meta->cooldown_meta_id(); } - // {replica_id}.meta - std::string remote_meta_path = std::to_string(t->replica_id()) + ".meta"; + auto [cooldown_replica_id, cooldown_term] = t->cooldown_conf(); + if (cooldown_replica_id != t->replica_id()) { + return; + } + // {cooldown_replica_id}.{cooldown_term}.meta + std::string remote_meta_path = + fmt::format("{}.{}.meta", cooldown_replica_id, cooldown_term); // filter out the paths that should be reserved // clang-format off files.erase(std::remove_if(files.begin(), files.end(), [&](io::Path& path) { @@ -2111,7 +2130,7 @@ void Tablet::remove_unused_remote_files() { buffer.insert({t->tablet_id(), {std::move(dest_fs), std::move(files)}}); auto& info = req.confirm_list.emplace_back(); info.__set_tablet_id(t->tablet_id()); - info.__set_cooldown_replica_id(t->replica_id()); + info.__set_cooldown_replica_id(cooldown_replica_id); info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift()); }; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index a4114d2ca5..c14f27e748 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -17,6 +17,7 @@ #pragma once +#include <atomic> #include <functional> #include <memory> #include <mutex> @@ -308,8 +309,6 @@ public: //////////////////////////////////////////////////////////////////////////// // begin cooldown functions //////////////////////////////////////////////////////////////////////////// - int64_t cooldown_replica_id() const { return _cooldown_replica_id; } - // Cooldown to remote fs. Status cooldown(); @@ -317,7 +316,22 @@ public: bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size); - void update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id) { + std::pair<int64_t, int64_t> cooldown_conf() const { + std::shared_lock rlock(_cooldown_conf_lock); + return {_cooldown_replica_id, _cooldown_term}; + } + + std::pair<int64_t, int64_t> cooldown_conf_unlocked() const { + return {_cooldown_replica_id, _cooldown_term}; + } + + // return true if update success + bool update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id) { + std::unique_lock wlock(_cooldown_conf_lock, std::try_to_lock); + if (!wlock.owns_lock()) { + LOG(INFO) << "try cooldown_conf_lock failed, tablet_id=" << tablet_id(); + return false; + } if (cooldown_term > _cooldown_term) { LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id() << " cooldown_replica_id: " << _cooldown_replica_id << " -> " @@ -325,7 +339,9 @@ public: << cooldown_term; _cooldown_replica_id = cooldown_replica_id; _cooldown_term = cooldown_term; + return true; } + return false; } Status remove_all_remote_rowsets(); @@ -344,6 +360,10 @@ public: uint32_t calc_cold_data_compaction_score() const; std::mutex& get_cold_compaction_lock() { return _cold_compaction_lock; } + + std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; } + + Status write_cooldown_meta(); //////////////////////////////////////////////////////////////////////////// // end cooldown functions //////////////////////////////////////////////////////////////////////////// @@ -412,10 +432,6 @@ public: return config::max_tablet_io_errors > 0 && _io_error_times >= config::max_tablet_io_errors; } - Status write_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs, - UniqueId cooldown_meta_id, const RowsetMetaSharedPtr& new_rs_meta, - const std::vector<RowsetMetaSharedPtr>& to_deletes); - private: Status _init_once_action(); void _print_missed_versions(const std::vector<Version>& missed_versions) const; @@ -455,11 +471,10 @@ private: //////////////////////////////////////////////////////////////////////////// // begin cooldown functions //////////////////////////////////////////////////////////////////////////// - Status _cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& dest_fs); - Status _follow_cooldowned_data(const std::shared_ptr<io::RemoteFileSystem>& fs, - int64_t cooldown_replica_id); + Status _cooldown_data(); + Status _follow_cooldowned_data(); Status _read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs, - int64_t cooldown_replica_id, TabletMetaPB* tablet_meta_pb); + TabletMetaPB* tablet_meta_pb); //////////////////////////////////////////////////////////////////////////// // end cooldown functions //////////////////////////////////////////////////////////////////////////// @@ -539,6 +554,13 @@ private: // cooldown related int64_t _cooldown_replica_id = -1; int64_t _cooldown_term = -1; + // `_cooldown_conf_lock` is used to serialize update cooldown conf and all operations that: + // 1. read cooldown conf + // 2. upload rowsets to remote storage + // 3. update cooldown meta id + mutable std::shared_mutex _cooldown_conf_lock; + // `_cold_compaction_lock` is used to serialize cold data compaction and all operations that + // may delete compaction input rowsets. std::mutex _cold_compaction_lock; DISALLOW_COPY_AND_ASSIGN(Tablet); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index fb8b834787..0f694348ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -111,6 +111,7 @@ public class Replica implements Writable { private boolean bad = false; private TUniqueId cooldownMetaId; + private long cooldownTerm = -1; /* * If set to true, with means this replica need to be repaired. explicitly. @@ -246,6 +247,14 @@ public class Replica implements Writable { this.cooldownMetaId = cooldownMetaId; } + public long getCooldownTerm() { + return cooldownTerm; + } + + public void setCooldownTerm(long cooldownTerm) { + this.cooldownTerm = cooldownTerm; + } + public boolean needFurtherRepair() { if (needFurtherRepair && System.currentTimeMillis() - this.furtherRepairSetTime < FURTHER_REPAIR_TIMEOUT_MS) { return true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 726e86544a..2068b93755 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -190,10 +190,11 @@ public class TabletInvertedIndex { } } - if (Config.enable_storage_policy && backendTabletInfo.isSetCooldownReplicaId()) { + if (Config.enable_storage_policy && backendTabletInfo.isSetCooldownTerm()) { handleCooldownConf(tabletMeta, backendTabletInfo, cooldownConfToPush, cooldownConfToUpdate); replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId()); + replica.setCooldownTerm(backendTabletInfo.getCooldownTerm()); } long partitionId = tabletMeta.getPartitionId(); @@ -395,7 +396,7 @@ public class TabletInvertedIndex { return; } - if (cooldownConf.first != beTabletInfo.getCooldownReplicaId()) { + if (beTabletInfo.getCooldownTerm() < cooldownConf.second) { CooldownConf conf = new CooldownConf(beTabletInfo.tablet_id, cooldownConf.first, cooldownConf.second); synchronized (cooldownConfToPush) { cooldownConfToPush.add(conf); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 1f4a8715e9..ec22328c55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -1147,18 +1147,40 @@ public class ReportHandler extends Daemon { if (backendTabletInfo.isSetCooldownMetaId()) { // replica has cooldowned data do { - if (backendTabletInfo.getReplicaId() == tablet.getCooldownConf().first) { + Pair<Long, Long> cooldownConf = tablet.getCooldownConf(); + if (backendTabletInfo.getCooldownTerm() > cooldownConf.second) { + // should not be here + LOG.warn("report cooldownTerm({}) > cooldownTerm in TabletMeta({}), tabletId={}", + backendTabletInfo.getCooldownTerm(), cooldownConf.second, tabletId); + return false; + } + if (backendTabletInfo.getReplicaId() == cooldownConf.first) { // this replica is true cooldown replica, so replica's cooldowned data must not be deleted break; } - if (backendTabletInfo.getReplicaId() != backendTabletInfo.getCooldownReplicaId() - && Env.getCurrentInvertedIndex().getReplicas(tabletId).stream() - .anyMatch(r -> backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) { - // this replica can not cooldown data, and shares same cooldowned data with others replica, - // so replica's cooldowned data must not be deleted - break; + List<Replica> replicas = Env.getCurrentInvertedIndex().getReplicas(tabletId); + if (backendTabletInfo.getCooldownTerm() <= 0) { + if (replicas.stream().anyMatch( + r -> backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) { + // this backend is just restarted, and shares same cooldowned data with others replica, + // so replica's cooldowned data must not be deleted + break; + } + } + long minCooldownTerm = Long.MAX_VALUE; + for (Replica r : replicas) { + minCooldownTerm = Math.min(r.getCooldownTerm(), minCooldownTerm); + } + if (backendTabletInfo.getCooldownTerm() >= minCooldownTerm) { + if (replicas.stream().anyMatch( + r -> backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) { + // this replica shares same cooldowned data with others replica, and won't follow data + // of lower cooldown term, so replica's cooldowned data must not be deleted + break; + } } - LOG.warn("replica's cooldowned data may have been deleted"); + LOG.warn("replica's cooldowned data may have been deleted. tabletId={}, replicaId={}", tabletId, + replicaId); return false; } while (false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 52341ebf45..773bae3f7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -48,6 +48,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.Pair; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherException; import org.apache.doris.common.ThriftServerContext; @@ -220,9 +221,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { return; } // check cooldownReplicaId - long cooldownReplicaId = tablet.getCooldownConf().first; - if (cooldownReplicaId != info.cooldown_replica_id) { - LOG.info("cooldown replica id not match({} vs {}), tablet={}", cooldownReplicaId, + Pair<Long, Long> cooldownConf = tablet.getCooldownConf(); + if (cooldownConf.first != info.cooldown_replica_id) { + LOG.info("cooldown replica id not match({} vs {}), tablet={}", cooldownConf.first, info.cooldown_replica_id, info.tablet_id); return; } @@ -239,6 +240,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { LOG.info("replica is not alive, tablet={}, replica={}", info.tablet_id, replica.getId()); return; } + if (replica.getCooldownTerm() != cooldownConf.second) { + LOG.info("replica's cooldown term not match({} vs {}), tablet={}", cooldownConf.second, + replica.getCooldownTerm(), info.tablet_id); + return; + } if (!info.cooldown_meta_id.equals(replica.getCooldownMetaId())) { LOG.info("cooldown meta id are not same, tablet={}", info.tablet_id); return; diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 3c1d4ced03..99ca74a22b 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -42,7 +42,7 @@ struct TTabletInfo { 15: optional Types.TReplicaId replica_id // data size on remote storage 16: optional Types.TSize remote_data_size - 17: optional Types.TReplicaId cooldown_replica_id + // 17: optional Types.TReplicaId cooldown_replica_id // 18: optional bool is_cooldown 19: optional i64 cooldown_term 20: optional Types.TUniqueId cooldown_meta_id --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org