xy720 commented on code in PR #43642: URL: https://github.com/apache/doris/pull/43642#discussion_r1849846090
########## fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java: ########## @@ -212,6 +226,27 @@ public void analyzeProperties() throws AnalysisException { // is atomic restore isAtomicRestore = eatBooleanProperty(copiedProperties, PROP_ATOMIC_RESTORE, isAtomicRestore); + if (copiedProperties.containsKey(PROP_STORAGE_RESOURCE)) { + storageResource = copiedProperties.get(PROP_STORAGE_RESOURCE); + Resource localResource = Env.getCurrentEnv().getResourceMgr().getResource(storageResource); + + if (localResource == null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR, + "Restore storage resource " + storageResource + " is not exist"); + } + + if (localResource.getType() != Resource.ResourceType.S3) { Review Comment: How about the hdfs resource? ########## fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java: ########## @@ -52,18 +53,36 @@ public class BackupMeta implements Writable, GsonPostProcessable { // resource name -> resource @SerializedName(value = "resourceNameMap") private Map<String, Resource> resourceNameMap = Maps.newHashMap(); + // storagePolicy name -> resource + @SerializedName(value = "storagePolicyNameMap") + private Map<String, StoragePolicy> storagePolicyNameMap = Maps.newHashMap(); private BackupMeta() { } - public BackupMeta(List<Table> tables, List<Resource> resources) { + public BackupMeta(List<Table> tables, List<Resource> resources, List<StoragePolicy> storagePolicys) { Review Comment: ```suggestion public BackupMeta(List<Table> tables, List<Resource> resources, List<StoragePolicy> storagePolicies) { ``` ########## fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java: ########## @@ -665,6 +686,32 @@ private void checkAndPrepareMeta() { } } + for (BackupJobInfo.BackupS3ResourceInfo backupS3ResourceInfo : jobInfo.newBackupObjects.s3Resources) { + Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(storageResource != null + ? storageResource : backupS3ResourceInfo.name); + if (resource == null) { + continue; + } + if (resource.getType() != Resource.ResourceType.S3) { Review Comment: How about the backup resource type? ########## fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java: ########## @@ -1253,14 +1318,114 @@ private void checkAndRestoreResources() { } else { try { // restore resource - resourceMgr.createResource(remoteOdbcResource, false); + resourceMgr.createResource(remoteOdbcResource); } catch (DdlException e) { status = new Status(ErrCode.COMMON_ERROR, e.getMessage()); return; } restoredResources.add(remoteOdbcResource); } } + + if (!reserveStoragePolicy) { + return; + } + + for (BackupJobInfo.BackupS3ResourceInfo backupS3ResourceInfo : jobInfo.newBackupObjects.s3Resources) { + String backupResourceName = backupS3ResourceInfo.name; + Resource localResource = resourceMgr.getResource(storageResource != null Review Comment: There should be multiple local resources for each partition ########## fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java: ########## @@ -429,15 +429,17 @@ public void resetPartitionIdForRestore( idToStoragePolicy = Maps.newHashMap(); for (Map.Entry<Long, Long> entry : partitionIdMap.entrySet()) { - idToDataProperty.put(entry.getKey(), origIdToDataProperty.get(entry.getValue())); + idToDataProperty.put(entry.getKey(), reserveStoragePolicy + ? origIdToDataProperty.get(entry.getValue()) : DataProperty.DEFAULT_HDD_DATA_PROPERTY); Review Comment: What if there is no HDD medium? ########## be/src/olap/snapshot_manager.cpp: ########## @@ -596,6 +647,9 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet new_tablet_meta->revise_delete_bitmap_unlocked(delete_bitmap_snapshot); } + //clear cooldown meta + new_tablet_meta->revise_clear_resource_id(); Review Comment: This resource id should be refresh in restore convert_rowset_ids? ########## be/src/olap/rowset/beta_rowset.cpp: ########## @@ -432,13 +432,97 @@ Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_row return Status::OK(); } +Status BetaRowset::download(const StorageResource& dest_fs, const std::string& dir) { + if (is_local()) { + DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); + return Status::InternalError("should be remote rowset. tablet_id={} rowset_id={}", + _rowset_meta->tablet_id(), rowset_id().to_string()); + } + + if (num_segments() < 1) { + return Status::OK(); + } + + Status status; + std::vector<string> linked_success_files; + Defer remove_linked_files {[&]() { // clear download files if errors happen Review Comment: Using linked_files as name is confusing because there is no link operation here ########## be/src/olap/snapshot_manager.cpp: ########## @@ -566,19 +569,67 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet } std::vector<RowsetMetaSharedPtr> rs_metas; + RowsetMetaSharedPtr rsm; Review Comment: Please add some comments ########## be/src/runtime/snapshot_loader.cpp: ########## @@ -120,6 +122,136 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l SnapshotLoader::~SnapshotLoader() = default; +static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs, + const std::string& dir, const std::string& rowset, + std::vector<std::string>* remote_files) { + bool exists = true; + std::vector<io::FileInfo> files; + RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists)); + for (auto& tmp_file : files) { + io::Path path(tmp_file.file_name); + std::string file_name = path.filename(); + + if (file_name.substr(0, rowset.length()).compare(rowset) != 0 || + !_end_with(file_name, ".idx")) { + continue; + } + remote_files->push_back(file_name); + } + + return Status::OK(); +} + +static Status download_and_upload_one_file(io::RemoteFileSystem& dest_fs, + io::RemoteFileSystem* cold_fs, + const std::string& remote_seg_path, + const std::string& local_seg_path, + const std::string& dest_seg_path) { + RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path)); + + // calc md5sum of localfile + std::string md5sum; + RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_seg_path, &md5sum)); + + RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path, dest_seg_path, md5sum)); + + //delete local file + RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path)); + + return Status::OK(); +} + +static Status upload_remote_rowset(io::RemoteFileSystem& dest_fs, int64_t tablet_id, + const std::string& local_path, const std::string& dest_path, + io::RemoteFileSystem* cold_fs, const std::string& rowset, + int segments, int have_inverted_index) { + Status res = Status::OK(); + + std::string remote_tablet_path = fmt::format("{}/{}", DATA_PREFIX, tablet_id); + + for (int i = 0; i < segments; i++) { + std::string remote_seg_path = fmt::format("{}/{}_{}.dat", remote_tablet_path, rowset, i); + std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path, rowset, i); + std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path, rowset, i); + + RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs, remote_seg_path, + local_seg_path, dest_seg_path)); + } + + if (!have_inverted_index) { + return res; + } + + std::vector<std::string> remote_index_files; + RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs, remote_tablet_path, rowset, + &remote_index_files)); + + for (auto& index_file : remote_index_files) { + std::string remote_index_path = fmt::format("{}/{}", remote_tablet_path, index_file); + std::string local_seg_path = fmt::format("{}/{}", local_path, index_file); + std::string dest_seg_path = fmt::format("{}/{}", dest_path, index_file); + + RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs, remote_index_path, + local_seg_path, dest_seg_path)); + } + return res; +} + +static Status upload_remote_file(io::RemoteFileSystem& dest_fs, int64_t tablet_id, + const std::string& local_path, const std::string& dest_path, + const std::string& remote_file) { + io::FileReaderSPtr file_reader; + Status res = Status::OK(); + + std::string full_remote_path = local_path + '/' + remote_file; Review Comment: Here I suggest use the proto/json format remote file info, easy to modify and serialize.Besides, you can name it some like tablet_id.remote_file_info, this file is in tablet's local snapshot dir. ########## be/src/runtime/snapshot_loader.cpp: ########## @@ -754,9 +894,9 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta } // rename the rowset ids and tabletid info in rowset meta - auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path, tablet_id, - tablet->replica_id(), tablet->table_id(), - tablet->partition_id(), schema_hash); Review Comment: Here we should refresh the storage policy id and resource id, this two ids are determined by FE in the pending state of restore job. ########## be/src/olap/snapshot_manager.cpp: ########## @@ -170,6 +171,7 @@ Result<std::vector<PendingRowsetGuard>> SnapshotManager::convert_rowset_ids( new_tablet_meta_pb.set_tablet_id(tablet_id); *new_tablet_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto(); new_tablet_meta_pb.set_replica_id(replica_id); + new_tablet_meta_pb.set_storage_policy_id(storage_policy_id); Review Comment: ```suggestion if (storage_policy_id > 0) { new_tablet_meta_pb.set_storage_policy_id(storage_policy_id); } ``` ########## be/src/runtime/snapshot_loader.cpp: ########## @@ -120,6 +122,136 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l SnapshotLoader::~SnapshotLoader() = default; +static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs, + const std::string& dir, const std::string& rowset, + std::vector<std::string>* remote_files) { + bool exists = true; + std::vector<io::FileInfo> files; + RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists)); + for (auto& tmp_file : files) { + io::Path path(tmp_file.file_name); + std::string file_name = path.filename(); + + if (file_name.substr(0, rowset.length()).compare(rowset) != 0 || + !_end_with(file_name, ".idx")) { + continue; + } + remote_files->push_back(file_name); + } + + return Status::OK(); +} + +static Status download_and_upload_one_file(io::RemoteFileSystem& dest_fs, + io::RemoteFileSystem* cold_fs, + const std::string& remote_seg_path, + const std::string& local_seg_path, + const std::string& dest_seg_path) { + RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path)); + + // calc md5sum of localfile + std::string md5sum; + RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_seg_path, &md5sum)); + + RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path, dest_seg_path, md5sum)); + + //delete local file + RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path)); + + return Status::OK(); +} + +static Status upload_remote_rowset(io::RemoteFileSystem& dest_fs, int64_t tablet_id, + const std::string& local_path, const std::string& dest_path, + io::RemoteFileSystem* cold_fs, const std::string& rowset, Review Comment: ```suggestion io::RemoteFileSystem* cold_fs, const std::string& rowset_id, ``` ########## be/src/olap/tablet.cpp: ########## @@ -2002,6 +2002,17 @@ Status Tablet::cooldown(RowsetSharedPtr rowset) { return Status::OK(); } +Status Tablet::download(RowsetSharedPtr rowset, const std::string& dir) { Review Comment: This function doesn't seem to be used ########## be/src/runtime/snapshot_loader.cpp: ########## @@ -120,6 +122,136 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l SnapshotLoader::~SnapshotLoader() = default; +static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs, + const std::string& dir, const std::string& rowset, + std::vector<std::string>* remote_files) { + bool exists = true; + std::vector<io::FileInfo> files; + RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists)); + for (auto& tmp_file : files) { + io::Path path(tmp_file.file_name); + std::string file_name = path.filename(); + + if (file_name.substr(0, rowset.length()).compare(rowset) != 0 || + !_end_with(file_name, ".idx")) { + continue; + } + remote_files->push_back(file_name); + } + + return Status::OK(); +} + +static Status download_and_upload_one_file(io::RemoteFileSystem& dest_fs, + io::RemoteFileSystem* cold_fs, + const std::string& remote_seg_path, + const std::string& local_seg_path, + const std::string& dest_seg_path) { + RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path)); + + // calc md5sum of localfile + std::string md5sum; + RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_seg_path, &md5sum)); + + RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path, dest_seg_path, md5sum)); + + //delete local file + RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path)); + + return Status::OK(); +} + +static Status upload_remote_rowset(io::RemoteFileSystem& dest_fs, int64_t tablet_id, + const std::string& local_path, const std::string& dest_path, + io::RemoteFileSystem* cold_fs, const std::string& rowset, + int segments, int have_inverted_index) { + Status res = Status::OK(); + + std::string remote_tablet_path = fmt::format("{}/{}", DATA_PREFIX, tablet_id); + + for (int i = 0; i < segments; i++) { + std::string remote_seg_path = fmt::format("{}/{}_{}.dat", remote_tablet_path, rowset, i); + std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path, rowset, i); + std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path, rowset, i); + + RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs, remote_seg_path, + local_seg_path, dest_seg_path)); + } + + if (!have_inverted_index) { + return res; + } + + std::vector<std::string> remote_index_files; + RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs, remote_tablet_path, rowset, + &remote_index_files)); + + for (auto& index_file : remote_index_files) { + std::string remote_index_path = fmt::format("{}/{}", remote_tablet_path, index_file); + std::string local_seg_path = fmt::format("{}/{}", local_path, index_file); + std::string dest_seg_path = fmt::format("{}/{}", dest_path, index_file); + + RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs, remote_index_path, + local_seg_path, dest_seg_path)); + } + return res; +} + +static Status upload_remote_file(io::RemoteFileSystem& dest_fs, int64_t tablet_id, Review Comment: Please add comments to explain this function. ########## be/src/olap/snapshot_manager.cpp: ########## @@ -566,19 +569,67 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet } std::vector<RowsetMetaSharedPtr> rs_metas; + RowsetMetaSharedPtr rsm; + bool have_remote_file = false; + io::FileWriterPtr file_writer; + for (auto& rs : consistent_rowsets) { if (rs->is_local()) { // local rowset res = rs->link_files_to(schema_full_path, rs->rowset_id()); if (!res.ok()) { break; } + rsm = rs->rowset_meta(); + } else { + std::string rowset_meta_str; + RowsetMetaPB rs_meta_pb; + rs->rowset_meta()->to_rowset_pb(&rs_meta_pb); + rs_meta_pb.SerializeToString(&rowset_meta_str); + + RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); + rowset_meta->init(rowset_meta_str); + + rsm = rowset_meta; + + // save_remote_file info Review Comment: I suggest proto format remote_file_info ########## fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java: ########## @@ -429,15 +429,17 @@ public void resetPartitionIdForRestore( idToStoragePolicy = Maps.newHashMap(); for (Map.Entry<Long, Long> entry : partitionIdMap.entrySet()) { - idToDataProperty.put(entry.getKey(), origIdToDataProperty.get(entry.getValue())); + idToDataProperty.put(entry.getKey(), reserveStoragePolicy + ? origIdToDataProperty.get(entry.getValue()) : DataProperty.DEFAULT_HDD_DATA_PROPERTY); Review Comment: If reserveStoragePolicy is false, here we should reset the storage policy -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org