github-actions[bot] commented on code in PR #44993: URL: https://github.com/apache/doris/pull/44993#discussion_r1879759232
########## be/src/runtime/snapshot_loader.cpp: ########## @@ -124,6 +132,161 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l SnapshotLoader::~SnapshotLoader() = default; +static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs, + const std::string& dir, const std::string& rowset, + std::vector<std::string>* remote_files) { + bool exists = true; + std::vector<io::FileInfo> files; + RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists)); + for (auto& tmp_file : files) { + io::Path path(tmp_file.file_name); + std::string file_name = path.filename(); + + if (file_name.substr(0, rowset.length()).compare(rowset) != 0 || + !_end_with(file_name, ".idx")) { + continue; + } + remote_files->push_back(file_name); + } + + return Status::OK(); +} + +static Status check_need_upload(const std::string& src_path, const std::string& local_file, + std::map<std::string, FileStat>& remote_files, std::string* md5sum, + bool* need_upload) { + // calc md5sum of localfile + RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(src_path + "/" + local_file, md5sum)); + VLOG_CRITICAL << "get file checksum: " << local_file << ": " << *md5sum; + + // check if this local file need upload + auto find = remote_files.find(local_file); + if (find != remote_files.end()) { + if (*md5sum != find->second.md5) { + // remote storage file exist, but with different checksum + LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first + << ", local: " << *md5sum; + // TODO(cmy): save these files and delete them later + *need_upload = true; + } + } else { + *need_upload = true; + } + + return Status::OK(); +} + +static Status download_and_upload_one_cold_file( + io::RemoteFileSystem& dest_fs, io::RemoteFileSystem* cold_fs, + const std::string& remote_seg_path, const std::string& local_seg_path, + const std::string& dest_seg_path, const std::string& local_path, + const std::string& local_file, std::map<std::string, FileStat>& remote_files) { + RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path)); + + bool need_upload = false; + std::string md5sum; + RETURN_IF_ERROR(check_need_upload(local_path, local_file, remote_files, &md5sum, &need_upload)); + + if (!need_upload) { + VLOG_CRITICAL << "cold file exist in remote path, no need to upload: " << local_file; + return Status::OK(); + } + + RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path, dest_seg_path, md5sum)); + + //delete local file + RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path)); + + return Status::OK(); +} + +static Status upload_remote_cold_rowset(io::RemoteFileSystem& dest_fs, int64_t tablet_id, + const std::string& local_path, const std::string& dest_path, + io::RemoteFileSystem* cold_fs, const std::string& rowset_id, + int segments, int have_inverted_index, + std::map<std::string, FileStat>& remote_files) { + Status res = Status::OK(); + + for (int i = 0; i < segments; i++) { + std::string local_file = fmt::format("{}_{}.dat", rowset_id, i); + std::string remote_seg_path = + fmt::format("{}/{}_{}.dat", remote_tablet_path(tablet_id), rowset_id, i); + std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path, rowset_id, i); + std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path, rowset_id, i); + + RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_seg_path, + local_seg_path, dest_seg_path, local_path, + local_file, remote_files)); + } + + if (!have_inverted_index) { + return res; + } + + std::vector<std::string> remote_index_files; + RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs, remote_tablet_path(tablet_id), + rowset_id, &remote_index_files)); + + for (auto& index_file : remote_index_files) { + std::string remote_index_path = + fmt::format("{}/{}", remote_tablet_path(tablet_id), index_file); + std::string local_seg_path = fmt::format("{}/{}", local_path, index_file); + std::string dest_seg_path = fmt::format("{}/{}", dest_path, index_file); + + RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_index_path, + local_seg_path, dest_seg_path, local_path, + index_file, remote_files)); + } + return res; +} + +/* + * get the cooldown data info from the hdr file, download the cooldown data and + * upload it to remote storage. + */ +static Status upload_remote_cold_file(io::RemoteFileSystem& dest_fs, int64_t tablet_id, + const std::string& local_path, const std::string& dest_path, + std::map<std::string, FileStat>& remote_files) { + Status res = Status::OK(); + std::string hdr_file = local_path + "/" + std::to_string(tablet_id) + ".hdr"; + + auto tablet_meta = std::make_shared<TabletMeta>(); + res = tablet_meta->create_from_file(hdr_file); + if (!res.ok()) { + return Status::Error<ErrorCode::ENGINE_LOAD_INDEX_TABLE_ERROR>( + "fail to load tablet_meta. file_path={}", hdr_file); + } + + if (tablet_meta->tablet_id() != tablet_id) { + return Status::InternalError("Invalid tablet {}", tablet_meta->tablet_id()); + } + + if (!tablet_meta->cooldown_meta_id().initialized()) { + return res; + } + + string rowset_id; + int segments; + int have_inverted_index; + + std::shared_ptr<io::RemoteFileSystem> colddata_fs; + RETURN_IF_ERROR(get_remote_file_system(tablet_meta->storage_policy_id(), &colddata_fs)); + + for (auto rowset_meta : tablet_meta->all_rs_metas()) { + rowset_id = rowset_meta->rowset_id().to_string(); + segments = rowset_meta->num_segments(); + have_inverted_index = rowset_meta->tablet_schema()->has_inverted_index(); + + if (segments > 0 && !rowset_meta->is_local()) { + RETURN_IF_ERROR(upload_remote_cold_rowset(dest_fs, tablet_id, local_path, dest_path, + colddata_fs.get(), rowset_id, segments, + have_inverted_index, remote_files)); + } + } + + return res; +} + Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path, Review Comment: warning: function 'upload' has cognitive complexity of 54 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path, ^ ``` <details> <summary>Additional context</summary> **be/src/runtime/snapshot_loader.cpp:291:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!_remote_fs) { ^ ``` **be/src/runtime/snapshot_loader.cpp:299:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::UPLOAD)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:299:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::UPLOAD)); ^ ``` **be/src/common/status.h:623:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:303:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, true)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:303:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, true)); ^ ``` **be/src/common/status.h:623:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:311:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp for (auto iter = src_to_dest_path.begin(); iter != src_to_dest_path.end(); iter++) { ^ ``` **be/src/runtime/snapshot_loader.cpp:317:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:317:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:623:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:322:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_list_with_checksum(dest_path, &remote_files)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:322:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_list_with_checksum(dest_path, &remote_files)); ^ ``` **be/src/common/status.h:623:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:331:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_get_existing_files_from_local(src_path, &local_files)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:331:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_get_existing_files_from_local(src_path, &local_files)); ^ ``` **be/src/common/status.h:623:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:334:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp for (auto it = local_files.begin(); it != local_files.end(); it++) { ^ ``` **be/src/runtime/snapshot_loader.cpp:335:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:335:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, ^ ``` **be/src/common/status.h:623:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:340:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:340:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:623:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:343:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (!need_upload) { ^ ``` **be/src/runtime/snapshot_loader.cpp:351:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(upload_with_checksum(*_remote_fs, local_path, remote_path, md5sum)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:351:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(upload_with_checksum(*_remote_fs, local_path, remote_path, md5sum)); ^ ``` **be/src/common/status.h:623:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:355:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/runtime/snapshot_loader.cpp:355:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:623:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org