platoneko commented on code in PR #10280: URL: https://github.com/apache/doris/pull/10280#discussion_r911968188
########## be/src/olap/tablet.cpp: ########## @@ -1636,4 +1652,160 @@ std::shared_ptr<MemTracker>& Tablet::get_compaction_mem_tracker(CompactionType c } } +Status Tablet::cooldown() { + std::unique_lock schema_change_lock(_schema_change_lock, std::try_to_lock); + if (!schema_change_lock.owns_lock()) { + LOG(WARNING) << "schema change is running. tablet=" << tablet_id(); + return Status::OLAPInternalError(OLAP_ERR_BE_TRY_BE_LOCK_ERROR); + } + // Check executing serially with compaction task. + std::unique_lock base_compaction_lock(_base_compaction_lock, std::try_to_lock); + if (!base_compaction_lock.owns_lock()) { + LOG(WARNING) << "base compaction is running. tablet=" << tablet_id(); + return Status::OLAPInternalError(OLAP_ERR_BE_TRY_BE_LOCK_ERROR); + } + std::unique_lock cumu_compaction_lock(_cumulative_compaction_lock, std::try_to_lock); + if (!cumu_compaction_lock.owns_lock()) { + LOG(WARNING) << "cumulative compaction is running. tablet=" << tablet_id(); + return Status::OLAPInternalError(OLAP_ERR_BE_TRY_BE_LOCK_ERROR); + } + auto dest_fs = io::FileSystemMap::instance()->get(cooldown_resource()); + if (!dest_fs) { + return Status::OLAPInternalError(OLAP_ERR_NOT_INITED); + } + DCHECK(dest_fs->type() == io::FileSystemType::S3); + auto old_rowset = pick_cooldown_rowset(); + if (!old_rowset) { + LOG(WARNING) << "Cannot pick cooldown rowset in tablet " << tablet_id(); + return Status::OK(); + } + RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id(); + + auto start = std::chrono::steady_clock::now(); + + RETURN_IF_ERROR(old_rowset->upload_to(reinterpret_cast<io::RemoteFileSystem*>(dest_fs.get()), + new_rowset_id)); + + auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start); + LOG(INFO) << "Upload rowset " << old_rowset->version() << " " << new_rowset_id.to_string() + << " to " << dest_fs->root_path().native() << ", tablet_id=" << tablet_id() + << ", duration=" << duration.count() << ", capacity=" << old_rowset->data_disk_size() + << ", tp=" << old_rowset->data_disk_size() / duration.count(); + + // gen a new rowset + auto new_rowset_meta = std::make_shared<RowsetMeta>(*old_rowset->rowset_meta()); + new_rowset_meta->set_rowset_id(new_rowset_id); + new_rowset_meta->set_resource_id(dest_fs->resource_id()); + new_rowset_meta->set_fs(dest_fs); + new_rowset_meta->set_creation_time(time(nullptr)); + RowsetSharedPtr new_rowset; + RowsetFactory::create_rowset(&_schema, _tablet_path, std::move(new_rowset_meta), &new_rowset); + + std::vector to_add {std::move(new_rowset)}; + std::vector to_delete {std::move(old_rowset)}; + + std::unique_lock meta_wlock(_meta_lock); + modify_rowsets(to_add, to_delete); + save_meta(); + return Status::OK(); +} + +RowsetSharedPtr Tablet::pick_cooldown_rowset() { + RowsetSharedPtr rowset; + { + std::shared_lock meta_rlock(_meta_lock); + + // We pick the rowset with smallest start version in local. + int64_t smallest_version = std::numeric_limits<int64_t>::max(); + for (const auto& it : _rs_version_map) { + auto& rs = it.second; + if (rs->is_local() && rs->start_version() < smallest_version) { + smallest_version = rs->start_version(); + rowset = rs; + } + } + } + return rowset; +} + +bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) { + // std::shared_lock meta_rlock(_meta_lock); + if (cooldown_resource().empty()) { Review Comment: yeah, i will change it to storage_policy soon, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org