This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a68fc551f0 [bug](cooldown) Fix async_write_cooldown_meta and snapshot cooldowned version not continuous bug (#20437) a68fc551f0 is described below commit a68fc551f0cf6d7c3315a04703ed459dbb117877 Author: plat1ko <platonekos...@gmail.com> AuthorDate: Thu Jun 8 15:35:35 2023 +0800 [bug](cooldown) Fix async_write_cooldown_meta and snapshot cooldowned version not continuous bug (#20437) --- be/src/agent/task_worker_pool.cpp | 4 +- be/src/common/status.h | 2 + be/src/olap/rowset/rowset_meta.h | 4 -- be/src/olap/snapshot_manager.cpp | 45 +++++++++++++++++--- be/src/olap/tablet.cpp | 66 ++++++++++++++++++++++------- be/src/olap/tablet.h | 21 ++------- be/test/io/cache/remote_file_cache_test.cpp | 1 - be/test/olap/rowset/beta_rowset_test.cpp | 3 -- 8 files changed, 99 insertions(+), 47 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 37a52e50e6..ba160103ed 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1178,7 +1178,9 @@ void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() { continue; } if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term, - cooldown_conf.cooldown_replica_id)) { + cooldown_conf.cooldown_replica_id) && + cooldown_conf.cooldown_replica_id == tablet->replica_id() && + tablet->tablet_meta()->cooldown_meta_id().initialized()) { Tablet::async_write_cooldown_meta(tablet); } } diff --git a/be/src/common/status.h b/be/src/common/status.h index 569d542d25..538b7de24f 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -423,6 +423,8 @@ public: return code == _code; } + void set_code(int code) { _code = code; } + bool ok() const { return _code == ErrorCode::OK; } bool is_io_error() const { diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index b132c40db2..6736c53cac 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -109,10 +109,6 @@ public: const std::string& resource_id() const { return _rowset_meta_pb.resource_id(); } - void set_resource_id(std::string resource_id) { - _rowset_meta_pb.set_resource_id(std::move(resource_id)); - } - bool is_local() const { return !_rowset_meta_pb.has_resource_id(); } RowsetId rowset_id() const { return _rowset_id; } diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index d41d5561c0..d19af18cca 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -347,6 +347,23 @@ Status SnapshotManager::_link_index_and_data_files( return res; } +// `rs_metas` MUST already be sorted by `RowsetMeta::comparator` +Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets) { + if (rowsets.size() < 2) { + return Status::OK(); + } + auto prev = rowsets.begin(); + for (auto it = rowsets.begin() + 1; it != rowsets.end(); ++it) { + if ((*prev)->end_version() + 1 != (*it)->start_version()) { + return Status::InternalError("versions are not continuity: prev={} cur={}", + (*prev)->version().to_string(), + (*it)->version().to_string()); + } + prev = it; + } + return Status::OK(); +} + Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet, const TSnapshotRequest& request, string* snapshot_path, @@ -493,11 +510,29 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet } version = request.version; } - // get shortest version path - // it very important!!!! - // it means 0-version has to be a readable version graph - res = ref_tablet->capture_consistent_rowsets(Version(0, version), - &consistent_rowsets); + if (ref_tablet->tablet_meta()->cooldown_meta_id().initialized()) { + // Tablet has cooldowned data, MUST pick consistent rowsets with continuous cooldowned version + // Get max cooldowned version + int64_t max_cooldowned_version = -1; + for (auto& [v, rs] : ref_tablet->rowset_map()) { + if (rs->is_local()) continue; + consistent_rowsets.push_back(rs); + max_cooldowned_version = std::max(max_cooldowned_version, v.second); + } + DCHECK_GE(max_cooldowned_version, 1) << "tablet_id=" << ref_tablet->tablet_id(); + std::sort(consistent_rowsets.begin(), consistent_rowsets.end(), + Rowset::comparator); + res = check_version_continuity(consistent_rowsets); + if (res.ok() && max_cooldowned_version < version) { + // Pick consistent rowsets of remaining required version + res = ref_tablet->capture_consistent_rowsets( + {max_cooldowned_version + 1, version}, &consistent_rowsets); + } + } else { + // get shortest version path + res = ref_tablet->capture_consistent_rowsets(Version(0, version), + &consistent_rowsets); + } if (!res.ok()) { LOG(WARNING) << "fail to select versions to span. res=" << res; break; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index bd925672d0..224823deca 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -148,15 +148,19 @@ bvar::Window<bvar::Adder<uint64_t>> exceed_version_limit_counter_minute( struct WriteCooldownMetaExecutors { WriteCooldownMetaExecutors(size_t executor_nums = 5); - static WriteCooldownMetaExecutors* GetInstance() { + static WriteCooldownMetaExecutors* get_instance() { static WriteCooldownMetaExecutors instance; return &instance; } void submit(TabletSharedPtr tablet); - size_t _get_executor_pos(int64_t tablet_id) const { return tablet_id % _executor_nums; }; + size_t _get_executor_pos(int64_t tablet_id) const { + return std::hash<int64_t>()(tablet_id) % _executor_nums; + }; + // Each executor is a mpsc to ensure uploads of the same tablet meta are not concurrent + // FIXME(AlexYue): Use mpsc instead of `ThreadPool` with 1 thread std::vector<std::unique_ptr<ThreadPool>> _executors; - std::unordered_set<int64_t> _pengding_tablets; + std::unordered_set<int64_t> _pending_tablets; std::mutex _latch; size_t _executor_nums; }; @@ -165,7 +169,7 @@ WriteCooldownMetaExecutors::WriteCooldownMetaExecutors(size_t executor_nums) : _executor_nums(executor_nums) { for (size_t i = 0; i < _executor_nums; i++) { std::unique_ptr<ThreadPool> pool; - ThreadPoolBuilder("AsyncWriteCooldownMetaExecutor") + ThreadPoolBuilder("WriteCooldownMetaExecutor") .set_min_threads(1) .set_max_threads(1) .set_max_queue_size(std::numeric_limits<int>::max()) @@ -187,16 +191,16 @@ void WriteCooldownMetaExecutors::WriteCooldownMetaExecutors::submit(TabletShared { // one tablet could at most have one cooldown task to be done std::unique_lock<std::mutex> lck {_latch}; - if (_pengding_tablets.count(tablet_id) > 0) { + if (_pending_tablets.count(tablet_id) > 0) { return; } - _pengding_tablets.insert(tablet_id); + _pending_tablets.insert(tablet_id); } auto async_write_task = [this, t = std::move(tablet)]() { { std::unique_lock<std::mutex> lck {_latch}; - _pengding_tablets.erase(t->tablet_id()); + _pending_tablets.erase(t->tablet_id()); } auto s = t->write_cooldown_meta(); if (s.ok()) { @@ -899,7 +903,7 @@ Status Tablet::capture_consistent_rowsets(const Version& spec_version, Status Tablet::_capture_consistent_rowsets_unlocked(const std::vector<Version>& version_path, std::vector<RowsetSharedPtr>* rowsets) const { - DCHECK(rowsets != nullptr && rowsets->empty()); + DCHECK(rowsets != nullptr); rowsets->reserve(version_path.size()); for (auto& version : version_path) { bool is_find = false; @@ -1980,6 +1984,15 @@ Status Tablet::_cooldown_data() { if (!old_rowset) { return Status::InternalError("cannot pick cooldown rowset in tablet {}", tablet_id()); } + if (old_rowset->num_segments() < 1) { + // Empty rowset, just reset rowset's resource_id + std::lock_guard meta_wlock(_meta_lock); + old_rowset->rowset_meta()->set_fs(dest_fs); + LOG(INFO) << "cooldown empty rowset " << old_rowset->version() << " " + << old_rowset->rowset_id().to_string() << " to " << dest_fs->root_path().native() + << ", tablet_id=" << tablet_id(); + return Status::OK(); + } RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id(); add_pending_remote_rowset(new_rowset_id.to_string()); Status st; @@ -2004,7 +2017,6 @@ Status Tablet::_cooldown_data() { // gen a new rowset auto new_rowset_meta = std::make_shared<RowsetMeta>(*old_rowset->rowset_meta()); new_rowset_meta->set_rowset_id(new_rowset_id); - new_rowset_meta->set_resource_id(dest_fs->id()); new_rowset_meta->set_fs(dest_fs); new_rowset_meta->set_creation_time(time(nullptr)); UniqueId cooldown_meta_id = UniqueId::gen_uid(); @@ -2022,7 +2034,7 @@ Status Tablet::_cooldown_data() { } erase_pending_remote_rowset(new_rowset_id.to_string()); { - std::unique_lock meta_rlock(_meta_lock); + std::shared_lock meta_rlock(_meta_lock); SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); save_meta(); } @@ -2069,12 +2081,30 @@ Status check_version_continuity(const std::vector<RowsetMetaSharedPtr>& rs_metas // It's guaranteed the write cooldown meta task would be invoked at the end unless BE crashes // one tablet would at most have one async task to be done void Tablet::async_write_cooldown_meta(TabletSharedPtr tablet) { - WriteCooldownMetaExecutors::GetInstance()->submit(std::move(tablet)); + WriteCooldownMetaExecutors::get_instance()->submit(std::move(tablet)); +} + +bool Tablet::update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id) { + std::unique_lock wlock(_cooldown_conf_lock, std::try_to_lock); + if (!wlock.owns_lock()) { + LOG(INFO) << "try cooldown_conf_lock failed, tablet_id=" << tablet_id(); + return false; + } + if (cooldown_term <= _cooldown_term) return false; + LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id() + << " cooldown_replica_id: " << _cooldown_replica_id << " -> " << cooldown_replica_id + << ", cooldown_term: " << _cooldown_term << " -> " << cooldown_term; + _cooldown_replica_id = cooldown_replica_id; + _cooldown_term = cooldown_term; + return true; } -// hold SHARED `cooldown_conf_lock` Status Tablet::write_cooldown_meta() { - auto [cooldown_replica_id, cooldown_term] = cooldown_conf(); + std::shared_lock rlock(_cooldown_conf_lock); + if (_cooldown_replica_id != _tablet_meta->replica_id()) { + return Status::Aborted("not cooldown replcia({} vs {}) tablet_id={}", + _tablet_meta->replica_id(), _cooldown_replica_id, tablet_id()); + } std::shared_ptr<io::RemoteFileSystem> fs; RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs)); @@ -2096,7 +2126,12 @@ Status Tablet::write_cooldown_meta() { } std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(), RowsetMeta::comparator); DCHECK(cooldowned_rs_metas.front()->start_version() == 0); - RETURN_IF_ERROR(check_version_continuity(cooldowned_rs_metas)); + // If version not continuous, it must be a bug + if (auto st = check_version_continuity(cooldowned_rs_metas); !st.ok()) { + DCHECK(st.ok()) << st << " tablet_id=" << tablet_id(); + st.set_code(ABORTED); + return st; + } TabletMetaPB tablet_meta_pb; auto rs_metas = tablet_meta_pb.mutable_rs_metas(); @@ -2108,8 +2143,9 @@ Status Tablet::write_cooldown_meta() { tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo); std::string remote_meta_path = - remote_tablet_meta_path(tablet_id(), cooldown_replica_id, cooldown_term); + remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, _cooldown_term); io::FileWriterPtr tablet_meta_writer; + // FIXME(plat1ko): What if object store permanently unavailable? RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer)); auto val = tablet_meta_pb.SerializeAsString(); RETURN_IF_ERROR(tablet_meta_writer->append({val.data(), val.size()})); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 9d2deeb5cc..cd4e21fb79 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -367,24 +367,8 @@ public: return {_cooldown_replica_id, _cooldown_term}; } - // return true if update success - bool update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id) { - std::unique_lock wlock(_cooldown_conf_lock, std::try_to_lock); - if (!wlock.owns_lock()) { - LOG(INFO) << "try cooldown_conf_lock failed, tablet_id=" << tablet_id(); - return false; - } - if (cooldown_term > _cooldown_term) { - LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id() - << " cooldown_replica_id: " << _cooldown_replica_id << " -> " - << cooldown_replica_id << ", cooldown_term: " << _cooldown_term << " -> " - << cooldown_term; - _cooldown_replica_id = cooldown_replica_id; - _cooldown_term = cooldown_term; - return true; - } - return false; - } + // Return `true` if update success + bool update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id); Status remove_all_remote_rowsets(); @@ -406,6 +390,7 @@ public: std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; } static void async_write_cooldown_meta(TabletSharedPtr tablet); + // Return `ABORTED` if should not to retry again Status write_cooldown_meta(); //////////////////////////////////////////////////////////////////////////// // end cooldown functions diff --git a/be/test/io/cache/remote_file_cache_test.cpp b/be/test/io/cache/remote_file_cache_test.cpp index 5c1b932cc4..18fcb6de14 100644 --- a/be/test/io/cache/remote_file_cache_test.cpp +++ b/be/test/io/cache/remote_file_cache_test.cpp @@ -170,7 +170,6 @@ protected: // io::S3FileSystem::create will call connect, which will fail because s3_conf is empty. // but it does affect the following unit test ASSERT_FALSE(st.ok()) << st; - rowset.rowset_meta()->set_resource_id(resource_id); rowset.rowset_meta()->set_num_segments(1); rowset.rowset_meta()->set_fs(fs); rowset.rowset_meta()->set_tablet_id(tablet_id); diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp index 09c5019dfb..80e540f8ba 100644 --- a/be/test/olap/rowset/beta_rowset_test.cpp +++ b/be/test/olap/rowset/beta_rowset_test.cpp @@ -269,7 +269,6 @@ TEST_F(BetaRowsetTest, ReadTest) { Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true)); rowset.rowset_meta()->set_num_segments(1); - rowset.rowset_meta()->set_resource_id(resource_id); rowset.rowset_meta()->set_fs(fs); std::vector<segment_v2::SegmentSharedPtr> segments; @@ -284,7 +283,6 @@ TEST_F(BetaRowsetTest, ReadTest) { fs->_client.reset(new S3ClientMockGetError()); rowset.rowset_meta()->set_num_segments(1); - rowset.rowset_meta()->set_resource_id(resource_id); rowset.rowset_meta()->set_fs(fs); std::vector<segment_v2::SegmentSharedPtr> segments; @@ -299,7 +297,6 @@ TEST_F(BetaRowsetTest, ReadTest) { fs->_client.reset(new S3ClientMockGetErrorData()); rowset.rowset_meta()->set_num_segments(1); - rowset.rowset_meta()->set_resource_id(resource_id); rowset.rowset_meta()->set_fs(fs); std::vector<segment_v2::SegmentSharedPtr> segments; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org