This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 52f9e03eea [fix](cooldown) Use `pending_remote_rowsets` to avoid deleting rowset files being uploaded (#16803) 52f9e03eea is described below commit 52f9e03eea17b3ed3351f6229e3e901d154031ed Author: plat1ko <platonekos...@gmail.com> AuthorDate: Tue Feb 21 21:58:20 2023 +0800 [fix](cooldown) Use `pending_remote_rowsets` to avoid deleting rowset files being uploaded (#16803) --- be/src/olap/cold_data_compaction.cpp | 2 +- be/src/olap/compaction.cpp | 62 ++++++++++++++++-------------------- be/src/olap/tablet.cpp | 56 +++++++++++++++++++------------- be/src/olap/tablet.h | 7 ++-- 4 files changed, 67 insertions(+), 60 deletions(-) diff --git a/be/src/olap/cold_data_compaction.cpp b/be/src/olap/cold_data_compaction.cpp index 9f24c9c170..9a92d9eead 100644 --- a/be/src/olap/cold_data_compaction.cpp +++ b/be/src/olap/cold_data_compaction.cpp @@ -80,7 +80,7 @@ Status ColdDataCompaction::modify_rowsets() { // TODO(plat1ko): process primary key _tablet->tablet_meta()->set_cooldown_meta_id(cooldown_meta_id); } - + Tablet::erase_pending_remote_rowset(_output_rowset->rowset_id().to_string()); { std::shared_lock rlock(_tablet->get_header_lock()); _tablet->save_meta(); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index d585516954..ecfc3d8e56 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -272,6 +272,9 @@ Status Compaction::do_compaction_impl(int64_t permits) { bool vertical_compaction = should_vertical_compaction(); RETURN_NOT_OK(construct_input_rowset_readers()); RETURN_NOT_OK(construct_output_rowset_writer(vertical_compaction)); + if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { + Tablet::add_pending_remote_rowset(_output_rs_writer->rowset_id().to_string()); + } TRACE("prepare finished"); // 2. write merged rows to output rowset @@ -282,45 +285,35 @@ Status Compaction::do_compaction_impl(int64_t permits) { stats.rowid_conversion = &_rowid_conversion; } - auto build_output_rowset = [&]() { - Status res; - if (use_vectorized_compaction) { - if (vertical_compaction) { - res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), - get_avg_segment_rows(), &stats); - } else { - res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, - _input_rs_readers, _output_rs_writer.get(), &stats); - } + Status res; + if (use_vectorized_compaction) { + if (vertical_compaction) { + res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, + _input_rs_readers, _output_rs_writer.get(), + get_avg_segment_rows(), &stats); } else { - LOG(FATAL) << "Only support vectorized compaction"; + res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, + _input_rs_readers, _output_rs_writer.get(), &stats); } + } else { + LOG(FATAL) << "Only support vectorized compaction"; + } - if (!res.ok()) { - LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". res=" << res - << ", tablet=" << _tablet->full_name() - << ", output_version=" << _output_version; - return res; - } - TRACE("merge rowsets finished"); - TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows); - TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows); - - _output_rowset = _output_rs_writer->build(); - if (_output_rowset == nullptr) { - LOG(WARNING) << "rowset writer build failed. writer version:" - << ", output_version=" << _output_version; - return Status::Error<ROWSET_BUILDER_INIT>(); - } + if (!res.ok()) { + LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". res=" << res + << ", tablet=" << _tablet->full_name() + << ", output_version=" << _output_version; return res; - }; + } + TRACE("merge rowsets finished"); + TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows); + TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows); - if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { - std::shared_lock slock(_tablet->get_remote_files_lock()); - RETURN_IF_ERROR(build_output_rowset()); - } else { - RETURN_IF_ERROR(build_output_rowset()); + _output_rowset = _output_rs_writer->build(); + if (_output_rowset == nullptr) { + LOG(WARNING) << "rowset writer build failed. writer version:" + << ", output_version=" << _output_version; + return Status::Error<ROWSET_BUILDER_INIT>(); } TRACE_COUNTER_INCREMENT("output_rowset_data_size", _output_rowset->data_disk_size()); @@ -457,6 +450,7 @@ Status Compaction::modify_rowsets() { void Compaction::gc_output_rowset() { if (_state != CompactionState::SUCCESS && _output_rowset != nullptr) { if (!_output_rowset->is_local()) { + Tablet::erase_pending_remote_rowset(_output_rowset->rowset_id().to_string()); _tablet->record_unused_remote_rowset(_output_rowset->rowset_id(), _output_rowset->rowset_meta()->resource_id(), _output_rowset->num_segments()); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 8cbafa7f6e..75c1bff53e 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1728,20 +1728,17 @@ Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& dest_ return Status::InternalError("cannot pick cooldown rowset in tablet {}", tablet_id()); } RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id(); - - auto start = std::chrono::steady_clock::now(); - + add_pending_remote_rowset(new_rowset_id.to_string()); Status st; - { - std::shared_lock slock(_remote_files_lock, std::try_to_lock); - if (!slock.owns_lock()) { - return Status::Status::Error<TRY_LOCK_FAILED>("try remote_files_lock failed"); + Defer defer {[&] { + if (!st.ok()) { + erase_pending_remote_rowset(new_rowset_id.to_string()); + // reclaim the incomplete rowset data in remote storage + record_unused_remote_rowset(new_rowset_id, dest_fs->id(), old_rowset->num_segments()); } - st = old_rowset->upload_to(dest_fs.get(), new_rowset_id); - } - if (!st.ok()) { - // reclaim the incomplete rowset data in remote storage - record_unused_remote_rowset(new_rowset_id, dest_fs->id(), old_rowset->num_segments()); + }}; + auto start = std::chrono::steady_clock::now(); + if (st = old_rowset->upload_to(dest_fs.get(), new_rowset_id); !st.ok()) { return st; } @@ -1760,7 +1757,10 @@ Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& dest_ UniqueId cooldown_meta_id = UniqueId::gen_uid(); // upload cooldowned rowset meta to remote fs - RETURN_IF_ERROR(write_cooldown_meta(dest_fs, cooldown_meta_id, new_rowset_meta, {})); + st = write_cooldown_meta(dest_fs, cooldown_meta_id, new_rowset_meta, {}); + if (!st.ok()) { + return st; + } RowsetSharedPtr new_rowset; RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta, &new_rowset); @@ -1774,6 +1774,7 @@ Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& dest_ _tablet_meta->set_cooldown_meta_id(cooldown_meta_id); } } + erase_pending_remote_rowset(new_rowset_id.to_string()); { std::unique_lock meta_rlock(_meta_lock); save_meta(); @@ -2047,6 +2048,18 @@ Status Tablet::remove_all_remote_rowsets() { gc_pb.SerializeAsString()); } +static std::unordered_set<std::string> s_pending_remote_rowsets; +static std::mutex s_pending_remote_rowsets_mtx; + +void Tablet::add_pending_remote_rowset(std::string rowset_id) { + std::lock_guard lock(s_pending_remote_rowsets_mtx); + s_pending_remote_rowsets.insert(std::move(rowset_id)); +} +void Tablet::erase_pending_remote_rowset(const std::string& rowset_id) { + std::lock_guard lock(s_pending_remote_rowsets_mtx); + s_pending_remote_rowsets.erase(rowset_id); +} + void Tablet::remove_unused_remote_files() { auto tablets = StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) { return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() && @@ -2080,16 +2093,9 @@ void Tablet::remove_unused_remote_files() { Status st; std::vector<io::Path> files; - { - std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock); - if (!xlock.owns_lock()) { - LOG(WARNING) << "try remote_files_lock failed. tablet_id=" << t->tablet_id(); - return; - } - // FIXME(plat1ko): What if user reset resource in storage policy to another resource? - // Maybe we should also list files in previously uploaded resources. - st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()), &files); - } + // FIXME(plat1ko): What if user reset resource in storage policy to another resource? + // Maybe we should also list files in previously uploaded resources. + st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()), &files); if (!st.ok()) { LOG(WARNING) << "encounter error when remove unused remote files, tablet_id=" << t->tablet_id() << " : " << st; @@ -2099,6 +2105,10 @@ void Tablet::remove_unused_remote_files() { } // get all cooldowned rowsets std::unordered_set<std::string> cooldowned_rowsets; + { + std::lock_guard lock(s_pending_remote_rowsets_mtx); + cooldowned_rowsets = s_pending_remote_rowsets; + } UniqueId cooldown_meta_id; { std::shared_lock rlock(t->_meta_lock); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 82f42dccdd..73a200b383 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -328,7 +328,11 @@ public: static void remove_unused_remote_files(); - std::shared_mutex& get_remote_files_lock() { return _remote_files_lock; } + // If a rowset is to be written to remote filesystem, MUST add it to `pending_remote_rowsets` before uploading, + // and then erase it from `pending_remote_rowsets` after it has been insert to the Tablet. + // `remove_unused_remote_files` MUST NOT delete files of these pending rowsets. + static void add_pending_remote_rowset(std::string rowset_id); + static void erase_pending_remote_rowset(const std::string& rowset_id); uint32_t calc_cold_data_compaction_score() const; @@ -524,7 +528,6 @@ private: // cooldown related int64_t _cooldown_replica_id = -1; int64_t _cooldown_term = -1; - std::shared_mutex _remote_files_lock; std::mutex _cold_compaction_lock; DISALLOW_COPY_AND_ASSIGN(Tablet); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org