xy720 commented on code in PR #43642:
URL: https://github.com/apache/doris/pull/43642#discussion_r1849846090


##########
fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java:
##########
@@ -212,6 +226,27 @@ public void analyzeProperties() throws AnalysisException {
         // is atomic restore
         isAtomicRestore = eatBooleanProperty(copiedProperties, 
PROP_ATOMIC_RESTORE, isAtomicRestore);
 
+        if (copiedProperties.containsKey(PROP_STORAGE_RESOURCE)) {
+            storageResource = copiedProperties.get(PROP_STORAGE_RESOURCE);
+            Resource localResource = 
Env.getCurrentEnv().getResourceMgr().getResource(storageResource);
+
+            if (localResource == null) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
+                        "Restore storage resource " + storageResource + " is 
not exist");
+            }
+
+            if (localResource.getType() != Resource.ResourceType.S3) {

Review Comment:
   How about the hdfs resource?



##########
fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java:
##########
@@ -52,18 +53,36 @@ public class BackupMeta implements Writable, 
GsonPostProcessable {
     // resource name -> resource
     @SerializedName(value = "resourceNameMap")
     private Map<String, Resource> resourceNameMap = Maps.newHashMap();
+    // storagePolicy name -> resource
+    @SerializedName(value = "storagePolicyNameMap")
+    private Map<String, StoragePolicy> storagePolicyNameMap = 
Maps.newHashMap();
 
     private BackupMeta() {
     }
 
-    public BackupMeta(List<Table> tables, List<Resource> resources) {
+    public BackupMeta(List<Table> tables, List<Resource> resources, 
List<StoragePolicy> storagePolicys) {

Review Comment:
   ```suggestion
       public BackupMeta(List<Table> tables, List<Resource> resources, 
List<StoragePolicy> storagePolicies) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java:
##########
@@ -665,6 +686,32 @@ private void checkAndPrepareMeta() {
             }
         }
 
+        for (BackupJobInfo.BackupS3ResourceInfo backupS3ResourceInfo : 
jobInfo.newBackupObjects.s3Resources) {
+            Resource resource = 
Env.getCurrentEnv().getResourceMgr().getResource(storageResource != null
+                    ? storageResource : backupS3ResourceInfo.name);
+            if (resource == null) {
+                continue;
+            }
+            if (resource.getType() != Resource.ResourceType.S3) {

Review Comment:
   How about the backup resource type?



##########
fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java:
##########
@@ -1253,14 +1318,114 @@ private void checkAndRestoreResources() {
             } else {
                 try {
                     // restore resource
-                    resourceMgr.createResource(remoteOdbcResource, false);
+                    resourceMgr.createResource(remoteOdbcResource);
                 } catch (DdlException e) {
                     status = new Status(ErrCode.COMMON_ERROR, e.getMessage());
                     return;
                 }
                 restoredResources.add(remoteOdbcResource);
             }
         }
+
+        if (!reserveStoragePolicy) {
+            return;
+        }
+
+        for (BackupJobInfo.BackupS3ResourceInfo backupS3ResourceInfo : 
jobInfo.newBackupObjects.s3Resources) {
+            String backupResourceName = backupS3ResourceInfo.name;
+            Resource localResource = resourceMgr.getResource(storageResource 
!= null

Review Comment:
   There should be multiple local resources for each partition



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java:
##########
@@ -429,15 +429,17 @@ public void resetPartitionIdForRestore(
         idToStoragePolicy = Maps.newHashMap();
 
         for (Map.Entry<Long, Long> entry : partitionIdMap.entrySet()) {
-            idToDataProperty.put(entry.getKey(), 
origIdToDataProperty.get(entry.getValue()));
+            idToDataProperty.put(entry.getKey(), reserveStoragePolicy
+                    ? origIdToDataProperty.get(entry.getValue()) : 
DataProperty.DEFAULT_HDD_DATA_PROPERTY);

Review Comment:
   What if there is no HDD medium?



##########
be/src/olap/snapshot_manager.cpp:
##########
@@ -596,6 +647,9 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
             
new_tablet_meta->revise_delete_bitmap_unlocked(delete_bitmap_snapshot);
         }
 
+        //clear cooldown meta
+        new_tablet_meta->revise_clear_resource_id();

Review Comment:
   This resource id should be refresh in restore convert_rowset_ids?



##########
be/src/olap/rowset/beta_rowset.cpp:
##########
@@ -432,13 +432,97 @@ Status BetaRowset::copy_files_to(const std::string& dir, 
const RowsetId& new_row
     return Status::OK();
 }
 
+Status BetaRowset::download(const StorageResource& dest_fs, const std::string& 
dir) {
+    if (is_local()) {
+        DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id();
+        return Status::InternalError("should be remote rowset. tablet_id={} 
rowset_id={}",
+                                     _rowset_meta->tablet_id(), 
rowset_id().to_string());
+    }
+
+    if (num_segments() < 1) {
+        return Status::OK();
+    }
+
+    Status status;
+    std::vector<string> linked_success_files;
+    Defer remove_linked_files {[&]() { // clear download files if errors happen

Review Comment:
   Using linked_files as name is confusing because there is no link operation 
here



##########
be/src/olap/snapshot_manager.cpp:
##########
@@ -566,19 +569,67 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
         }
 
         std::vector<RowsetMetaSharedPtr> rs_metas;
+        RowsetMetaSharedPtr rsm;

Review Comment:
   Please add some comments



##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -120,6 +122,136 @@ Status SnapshotLoader::init(TStorageBackendType::type 
type, const std::string& l
 
 SnapshotLoader::~SnapshotLoader() = default;
 
+static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs,
+                                               const std::string& dir, const 
std::string& rowset,
+                                               std::vector<std::string>* 
remote_files) {
+    bool exists = true;
+    std::vector<io::FileInfo> files;
+    RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists));
+    for (auto& tmp_file : files) {
+        io::Path path(tmp_file.file_name);
+        std::string file_name = path.filename();
+
+        if (file_name.substr(0, rowset.length()).compare(rowset) != 0 ||
+            !_end_with(file_name, ".idx")) {
+            continue;
+        }
+        remote_files->push_back(file_name);
+    }
+
+    return Status::OK();
+}
+
+static Status download_and_upload_one_file(io::RemoteFileSystem& dest_fs,
+                                           io::RemoteFileSystem* cold_fs,
+                                           const std::string& remote_seg_path,
+                                           const std::string& local_seg_path,
+                                           const std::string& dest_seg_path) {
+    RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path));
+
+    // calc md5sum of localfile
+    std::string md5sum;
+    RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_seg_path, 
&md5sum));
+
+    RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path, 
dest_seg_path, md5sum));
+
+    //delete local file
+    
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path));
+
+    return Status::OK();
+}
+
+static Status upload_remote_rowset(io::RemoteFileSystem& dest_fs, int64_t 
tablet_id,
+                                   const std::string& local_path, const 
std::string& dest_path,
+                                   io::RemoteFileSystem* cold_fs, const 
std::string& rowset,
+                                   int segments, int have_inverted_index) {
+    Status res = Status::OK();
+
+    std::string remote_tablet_path = fmt::format("{}/{}", DATA_PREFIX, 
tablet_id);
+
+    for (int i = 0; i < segments; i++) {
+        std::string remote_seg_path = fmt::format("{}/{}_{}.dat", 
remote_tablet_path, rowset, i);
+        std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path, 
rowset, i);
+        std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path, 
rowset, i);
+
+        RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs, 
remote_seg_path,
+                                                     local_seg_path, 
dest_seg_path));
+    }
+
+    if (!have_inverted_index) {
+        return res;
+    }
+
+    std::vector<std::string> remote_index_files;
+    RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs, 
remote_tablet_path, rowset,
+                                                     &remote_index_files));
+
+    for (auto& index_file : remote_index_files) {
+        std::string remote_index_path = fmt::format("{}/{}", 
remote_tablet_path, index_file);
+        std::string local_seg_path = fmt::format("{}/{}", local_path, 
index_file);
+        std::string dest_seg_path = fmt::format("{}/{}", dest_path, 
index_file);
+
+        RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs, 
remote_index_path,
+                                                     local_seg_path, 
dest_seg_path));
+    }
+    return res;
+}
+
+static Status upload_remote_file(io::RemoteFileSystem& dest_fs, int64_t 
tablet_id,
+                                 const std::string& local_path, const 
std::string& dest_path,
+                                 const std::string& remote_file) {
+    io::FileReaderSPtr file_reader;
+    Status res = Status::OK();
+
+    std::string full_remote_path = local_path + '/' + remote_file;

Review Comment:
   Here I suggest use the proto/json format remote file info, easy to modify 
and serialize.Besides, you can name it some like tablet_id.remote_file_info, 
this file is in tablet's local snapshot dir.



##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -754,9 +894,9 @@ Status SnapshotLoader::move(const std::string& 
snapshot_path, TabletSharedPtr ta
     }
 
     // rename the rowset ids and tabletid info in rowset meta
-    auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path, 
tablet_id,
-                                                          
tablet->replica_id(), tablet->table_id(),
-                                                          
tablet->partition_id(), schema_hash);

Review Comment:
   Here we should refresh the storage policy id and resource id, this two ids 
are determined by FE in the pending state of restore job.



##########
be/src/olap/snapshot_manager.cpp:
##########
@@ -170,6 +171,7 @@ Result<std::vector<PendingRowsetGuard>> 
SnapshotManager::convert_rowset_ids(
     new_tablet_meta_pb.set_tablet_id(tablet_id);
     *new_tablet_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto();
     new_tablet_meta_pb.set_replica_id(replica_id);
+    new_tablet_meta_pb.set_storage_policy_id(storage_policy_id);

Review Comment:
   ```suggestion
       if (storage_policy_id > 0) {
           new_tablet_meta_pb.set_storage_policy_id(storage_policy_id);
       }
   ```



##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -120,6 +122,136 @@ Status SnapshotLoader::init(TStorageBackendType::type 
type, const std::string& l
 
 SnapshotLoader::~SnapshotLoader() = default;
 
+static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs,
+                                               const std::string& dir, const 
std::string& rowset,
+                                               std::vector<std::string>* 
remote_files) {
+    bool exists = true;
+    std::vector<io::FileInfo> files;
+    RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists));
+    for (auto& tmp_file : files) {
+        io::Path path(tmp_file.file_name);
+        std::string file_name = path.filename();
+
+        if (file_name.substr(0, rowset.length()).compare(rowset) != 0 ||
+            !_end_with(file_name, ".idx")) {
+            continue;
+        }
+        remote_files->push_back(file_name);
+    }
+
+    return Status::OK();
+}
+
+static Status download_and_upload_one_file(io::RemoteFileSystem& dest_fs,
+                                           io::RemoteFileSystem* cold_fs,
+                                           const std::string& remote_seg_path,
+                                           const std::string& local_seg_path,
+                                           const std::string& dest_seg_path) {
+    RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path));
+
+    // calc md5sum of localfile
+    std::string md5sum;
+    RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_seg_path, 
&md5sum));
+
+    RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path, 
dest_seg_path, md5sum));
+
+    //delete local file
+    
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path));
+
+    return Status::OK();
+}
+
+static Status upload_remote_rowset(io::RemoteFileSystem& dest_fs, int64_t 
tablet_id,
+                                   const std::string& local_path, const 
std::string& dest_path,
+                                   io::RemoteFileSystem* cold_fs, const 
std::string& rowset,

Review Comment:
   ```suggestion
                                      io::RemoteFileSystem* cold_fs, const 
std::string& rowset_id,
   ```



##########
be/src/olap/tablet.cpp:
##########
@@ -2002,6 +2002,17 @@ Status Tablet::cooldown(RowsetSharedPtr rowset) {
     return Status::OK();
 }
 
+Status Tablet::download(RowsetSharedPtr rowset, const std::string& dir) {

Review Comment:
   This function doesn't seem to be used



##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -120,6 +122,136 @@ Status SnapshotLoader::init(TStorageBackendType::type 
type, const std::string& l
 
 SnapshotLoader::~SnapshotLoader() = default;
 
+static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs,
+                                               const std::string& dir, const 
std::string& rowset,
+                                               std::vector<std::string>* 
remote_files) {
+    bool exists = true;
+    std::vector<io::FileInfo> files;
+    RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists));
+    for (auto& tmp_file : files) {
+        io::Path path(tmp_file.file_name);
+        std::string file_name = path.filename();
+
+        if (file_name.substr(0, rowset.length()).compare(rowset) != 0 ||
+            !_end_with(file_name, ".idx")) {
+            continue;
+        }
+        remote_files->push_back(file_name);
+    }
+
+    return Status::OK();
+}
+
+static Status download_and_upload_one_file(io::RemoteFileSystem& dest_fs,
+                                           io::RemoteFileSystem* cold_fs,
+                                           const std::string& remote_seg_path,
+                                           const std::string& local_seg_path,
+                                           const std::string& dest_seg_path) {
+    RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path));
+
+    // calc md5sum of localfile
+    std::string md5sum;
+    RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_seg_path, 
&md5sum));
+
+    RETURN_IF_ERROR(upload_with_checksum(dest_fs, local_seg_path, 
dest_seg_path, md5sum));
+
+    //delete local file
+    
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path));
+
+    return Status::OK();
+}
+
+static Status upload_remote_rowset(io::RemoteFileSystem& dest_fs, int64_t 
tablet_id,
+                                   const std::string& local_path, const 
std::string& dest_path,
+                                   io::RemoteFileSystem* cold_fs, const 
std::string& rowset,
+                                   int segments, int have_inverted_index) {
+    Status res = Status::OK();
+
+    std::string remote_tablet_path = fmt::format("{}/{}", DATA_PREFIX, 
tablet_id);
+
+    for (int i = 0; i < segments; i++) {
+        std::string remote_seg_path = fmt::format("{}/{}_{}.dat", 
remote_tablet_path, rowset, i);
+        std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path, 
rowset, i);
+        std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path, 
rowset, i);
+
+        RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs, 
remote_seg_path,
+                                                     local_seg_path, 
dest_seg_path));
+    }
+
+    if (!have_inverted_index) {
+        return res;
+    }
+
+    std::vector<std::string> remote_index_files;
+    RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs, 
remote_tablet_path, rowset,
+                                                     &remote_index_files));
+
+    for (auto& index_file : remote_index_files) {
+        std::string remote_index_path = fmt::format("{}/{}", 
remote_tablet_path, index_file);
+        std::string local_seg_path = fmt::format("{}/{}", local_path, 
index_file);
+        std::string dest_seg_path = fmt::format("{}/{}", dest_path, 
index_file);
+
+        RETURN_IF_ERROR(download_and_upload_one_file(dest_fs, cold_fs, 
remote_index_path,
+                                                     local_seg_path, 
dest_seg_path));
+    }
+    return res;
+}
+
+static Status upload_remote_file(io::RemoteFileSystem& dest_fs, int64_t 
tablet_id,

Review Comment:
   Please add comments to explain this function.



##########
be/src/olap/snapshot_manager.cpp:
##########
@@ -566,19 +569,67 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
         }
 
         std::vector<RowsetMetaSharedPtr> rs_metas;
+        RowsetMetaSharedPtr rsm;
+        bool have_remote_file = false;
+        io::FileWriterPtr file_writer;
+
         for (auto& rs : consistent_rowsets) {
             if (rs->is_local()) {
                 // local rowset
                 res = rs->link_files_to(schema_full_path, rs->rowset_id());
                 if (!res.ok()) {
                     break;
                 }
+                rsm = rs->rowset_meta();
+            } else {
+                std::string rowset_meta_str;
+                RowsetMetaPB rs_meta_pb;
+                rs->rowset_meta()->to_rowset_pb(&rs_meta_pb);
+                rs_meta_pb.SerializeToString(&rowset_meta_str);
+
+                RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
+                rowset_meta->init(rowset_meta_str);
+
+                rsm = rowset_meta;
+
+                // save_remote_file info

Review Comment:
   I suggest proto format remote_file_info



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java:
##########
@@ -429,15 +429,17 @@ public void resetPartitionIdForRestore(
         idToStoragePolicy = Maps.newHashMap();
 
         for (Map.Entry<Long, Long> entry : partitionIdMap.entrySet()) {
-            idToDataProperty.put(entry.getKey(), 
origIdToDataProperty.get(entry.getValue()));
+            idToDataProperty.put(entry.getKey(), reserveStoragePolicy
+                    ? origIdToDataProperty.get(entry.getValue()) : 
DataProperty.DEFAULT_HDD_DATA_PROPERTY);

Review Comment:
   If reserveStoragePolicy is false, here we should reset the storage policy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to