This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1e9bea28fd1 branch-3.0: [improve](restore) Link existing rowset files 
with source rowset id #48435 (#48999)
1e9bea28fd1 is described below

commit 1e9bea28fd106ffc735abc5a02a69e87ebe8520e
Author: walter <maoch...@selectdb.com>
AuthorDate: Sat Mar 15 10:39:58 2025 +0800

    branch-3.0: [improve](restore) Link existing rowset files with source 
rowset id #48435 (#48999)
    
    cherry pick from #48435
---
 be/src/cloud/pb_convert.cpp                        |   8 +
 be/src/olap/snapshot_manager.cpp                   |   8 +-
 be/src/olap/tablet_meta.cpp                        |  12 +-
 be/src/olap/tablet_meta.h                          |   1 +
 be/src/runtime/snapshot_loader.cpp                 | 890 ++++++++++++++-------
 be/src/runtime/snapshot_loader.h                   |   3 +
 be/src/service/backend_service.cpp                 |   8 +-
 be/src/util/stopwatch.hpp                          |   5 +-
 gensrc/proto/olap_file.proto                       |   7 +
 .../ccr_syncer_p1/test_backup_restore.groovy       |   9 +-
 10 files changed, 652 insertions(+), 299 deletions(-)

diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp
index 2c51e97dd57..ec483ba682c 100644
--- a/be/src/cloud/pb_convert.cpp
+++ b/be/src/cloud/pb_convert.cpp
@@ -85,6 +85,8 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const 
RowsetMetaPB& in)
     out->set_has_variant_type_in_schema(in.has_has_variant_type_in_schema());
     
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
     
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
+    out->set_source_rowset_id(in.source_rowset_id());
+    out->set_source_tablet_id(in.source_tablet_id());
 }
 
 void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
@@ -137,6 +139,8 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, 
RowsetMetaPB&& in) {
     out->set_has_variant_type_in_schema(in.has_variant_type_in_schema());
     
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
     
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
+    out->set_source_rowset_id(in.source_rowset_id());
+    out->set_source_tablet_id(in.source_tablet_id());
 }
 
 static void fill_schema_with_dict(const RowsetMetaCloudPB& in, RowsetMetaPB* 
out,
@@ -235,6 +239,8 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const 
RowsetMetaCloudPB& in,
     out->set_enable_segments_file_size(in.enable_segments_file_size());
     
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
     
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
+    out->set_source_rowset_id(in.source_rowset_id());
+    out->set_source_tablet_id(in.source_tablet_id());
 }
 
 void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in,
@@ -288,6 +294,8 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, 
RowsetMetaCloudPB&& in,
     out->set_enable_segments_file_size(in.enable_segments_file_size());
     
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
     
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
+    out->set_source_rowset_id(in.source_rowset_id());
+    out->set_source_tablet_id(in.source_tablet_id());
 }
 
 TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB& in) {
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 7f0e94274d9..a59ed36bb82 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -184,10 +184,8 @@ Result<std::vector<PendingRowsetGuard>> 
SnapshotManager::convert_rowset_ids(
 
     // load original tablet meta
     auto cloned_meta_file = fmt::format("{}/{}.hdr", clone_dir, tablet_id);
-    TabletMeta cloned_tablet_meta;
-    
RETURN_IF_ERROR_RESULT(cloned_tablet_meta.create_from_file(cloned_meta_file));
     TabletMetaPB cloned_tablet_meta_pb;
-    cloned_tablet_meta.to_meta_pb(&cloned_tablet_meta_pb);
+    RETURN_IF_ERROR_RESULT(TabletMeta::load_from_file(cloned_meta_file, 
&cloned_tablet_meta_pb));
 
     TabletMetaPB new_tablet_meta_pb;
     new_tablet_meta_pb = cloned_tablet_meta_pb;
@@ -230,6 +228,8 @@ Result<std::vector<PendingRowsetGuard>> 
SnapshotManager::convert_rowset_ids(
                 src_rs_id.init(visible_rowset.rowset_id_v2());
             }
             rowset_id_mapping[src_rs_id] = rowset_id;
+            rowset_meta->set_source_rowset_id(visible_rowset.rowset_id_v2());
+            
rowset_meta->set_source_tablet_id(cloned_tablet_meta_pb.tablet_id());
         } else {
             // remote rowset
             *rowset_meta = visible_rowset;
@@ -265,6 +265,8 @@ Result<std::vector<PendingRowsetGuard>> 
SnapshotManager::convert_rowset_ids(
                 src_rs_id.init(stale_rowset.rowset_id_v2());
             }
             rowset_id_mapping[src_rs_id] = rowset_id;
+            rowset_meta->set_source_rowset_id(stale_rowset.rowset_id_v2());
+            
rowset_meta->set_source_tablet_id(cloned_tablet_meta_pb.tablet_id());
         } else {
             // remote rowset
             *rowset_meta = stale_rowset;
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 50a62899a9d..9bbb99a44b6 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -453,18 +453,22 @@ void TabletMeta::init_column_from_tcolumn(uint32_t 
unique_id, const TColumn& tco
 }
 
 Status TabletMeta::create_from_file(const string& file_path) {
+    TabletMetaPB tablet_meta_pb;
+    RETURN_IF_ERROR(load_from_file(file_path, &tablet_meta_pb));
+    init_from_pb(tablet_meta_pb);
+    return Status::OK();
+}
+
+Status TabletMeta::load_from_file(const string& file_path, TabletMetaPB* 
tablet_meta_pb) {
     FileHeader<TabletMetaPB> file_header(file_path);
     // In file_header.deserialize(), it validates file length, signature, 
checksum of protobuf.
     RETURN_IF_ERROR(file_header.deserialize());
-    TabletMetaPB tablet_meta_pb;
     try {
-        tablet_meta_pb.CopyFrom(file_header.message());
+        tablet_meta_pb->CopyFrom(file_header.message());
     } catch (...) {
         return Status::Error<PARSE_PROTOBUF_ERROR>("fail to copy protocol 
buffer object. file={}",
                                                    file_path);
     }
-
-    init_from_pb(tablet_meta_pb);
     return Status::OK();
 }
 
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index cc7daf8a67a..dc4be4f5ef0 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -129,6 +129,7 @@ public:
     // Function create_from_file is used to be compatible with previous 
tablet_meta.
     // Previous tablet_meta is a physical file in tablet dir, which is not 
stored in rocksdb.
     Status create_from_file(const std::string& file_path);
+    static Status load_from_file(const std::string& file_path, TabletMetaPB* 
tablet_meta_pb);
     Status save(const std::string& file_path);
     Status save_as_json(const string& file_path);
     static Status save(const std::string& file_path, const TabletMetaPB& 
tablet_meta_pb);
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index 9007a4cc2b6..fd204af7c8d 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -20,6 +20,7 @@
 // IWYU pragma: no_include <bthread/errno.h>
 #include <errno.h> // IWYU pragma: keep
 #include <fmt/format.h>
+#include <gen_cpp/AgentService_types.h>
 #include <gen_cpp/FrontendService.h>
 #include <gen_cpp/FrontendService_types.h>
 #include <gen_cpp/HeartbeatService_types.h>
@@ -60,6 +61,108 @@
 
 namespace doris {
 
+struct LocalFileStat {
+    uint64_t size;
+    std::string md5;
+};
+
+struct RemoteFileStat {
+    std::string url;
+    std::string md5;
+    uint64_t size;
+};
+
+class SnapshotHttpDownloader {
+public:
+    SnapshotHttpDownloader(const TRemoteTabletSnapshot& remote_tablet_snapshot,
+                           TabletSharedPtr tablet, SnapshotLoader& 
snapshot_loader)
+            : _tablet(std::move(tablet)),
+              _snapshot_loader(snapshot_loader),
+              _local_tablet_id(remote_tablet_snapshot.local_tablet_id),
+              _remote_tablet_id(remote_tablet_snapshot.remote_tablet_id),
+              _local_path(remote_tablet_snapshot.local_snapshot_path),
+              _remote_path(remote_tablet_snapshot.remote_snapshot_path),
+              _remote_be_addr(remote_tablet_snapshot.remote_be_addr) {
+        auto& token = remote_tablet_snapshot.remote_token;
+        auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
+
+        // HEAD 
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
+        _base_url = 
fmt::format("http://{}:{}/api/_tablet/_download?token={}&channel=ingest_binlog";,
+                                remote_be_addr.hostname, remote_be_addr.port, 
token);
+    }
+    ~SnapshotHttpDownloader() = default;
+    SnapshotHttpDownloader(const SnapshotHttpDownloader&) = delete;
+    SnapshotHttpDownloader& operator=(const SnapshotHttpDownloader&) = delete;
+
+    void set_report_progress_callback(std::function<Status()> report_progress) 
{
+        _report_progress_callback = std::move(report_progress);
+    }
+
+    Status download();
+
+private:
+    constexpr static int kDownloadFileMaxRetry = 3;
+
+    // Load existing files from local snapshot path, compute the md5sum of the 
files
+    // if enable_download_md5sum_check is true
+    Status _load_existing_files();
+
+    // List remote files from remote be, and find the hdr file
+    Status _list_remote_files();
+
+    // Download hdr file from remote be to a tmp file
+    Status _download_hdr_file();
+
+    // Link same rowset files by compare local hdr file and remote hdr file
+    // if the local files are copied from the remote rowset, link them as the
+    // remote rowset files, to avoid the duplicated downloading.
+    Status _link_same_rowset_files();
+
+    // Get all remote file stats, excluding the hdr file.
+    Status _get_remote_file_stats();
+
+    // Compute the need download files according to the local files md5sum (if 
enable_download_md5sum_check is true)
+    void _get_need_download_files();
+
+    // Download all need download files
+    Status _download_files();
+
+    // Install remote hdr file to local snapshot path from the tmp file
+    Status _install_remote_hdr_file();
+
+    // Delete orphan files, which are not in remote
+    Status _delete_orphan_files();
+
+    // Download a file from remote be to local path with the file stat
+    Status _download_http_file(DataDir* data_dir, const std::string& 
remote_file_url,
+                               const std::string& local_file_path,
+                               const RemoteFileStat& remote_filestat);
+
+    // Get the file stat from remote be
+    Status _get_http_file_stat(const std::string& remote_file_url, 
RemoteFileStat* file_stat);
+
+    TabletSharedPtr _tablet;
+    SnapshotLoader& _snapshot_loader;
+    std::function<Status()> _report_progress_callback;
+
+    std::string _base_url;
+    int64_t _local_tablet_id;
+    int64_t _remote_tablet_id;
+    const std::string& _local_path;
+    const std::string& _remote_path;
+    const TNetworkAddress& _remote_be_addr;
+
+    std::string _local_hdr_filename;
+    std::string _remote_hdr_filename;
+    std::vector<std::string> _remote_file_list;
+    std::unordered_map<std::string, LocalFileStat> _local_files;
+    std::unordered_map<std::string, RemoteFileStat> _remote_files;
+
+    std::string _tmp_hdr_file;
+    RemoteFileStat _remote_hdr_filestat;
+    std::vector<std::string> _need_download_files;
+};
+
 static std::string get_loaded_tag_path(const std::string& snapshot_path) {
     return snapshot_path + "/LOADED";
 }
@@ -96,6 +199,473 @@ bool _end_with(std::string_view str, std::string_view 
match) {
            str.compare(str.size() - match.size(), match.size(), match) == 0;
 }
 
+Status SnapshotHttpDownloader::_get_http_file_stat(const std::string& 
remote_file_url,
+                                                   RemoteFileStat* file_stat) {
+    uint64_t file_size = 0;
+    std::string file_md5;
+    auto get_file_stat_cb = [&remote_file_url, &file_size, 
&file_md5](HttpClient* client) {
+        int64_t timeout_ms = config::download_binlog_meta_timeout_ms;
+        std::string url = remote_file_url;
+        if (config::enable_download_md5sum_check) {
+            // compute md5sum is time-consuming, so we set a longer timeout
+            timeout_ms = config::download_binlog_meta_timeout_ms * 3;
+            url = fmt::format("{}&acquire_md5=true", remote_file_url);
+        }
+        RETURN_IF_ERROR(client->init(url));
+        client->set_timeout_ms(timeout_ms);
+        RETURN_IF_ERROR(client->head());
+        RETURN_IF_ERROR(client->get_content_length(&file_size));
+        if (config::enable_download_md5sum_check) {
+            RETURN_IF_ERROR(client->get_content_md5(&file_md5));
+        }
+        return Status::OK();
+    };
+    RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, 
get_file_stat_cb));
+    file_stat->url = remote_file_url;
+    file_stat->size = file_size;
+    file_stat->md5 = std::move(file_md5);
+    return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_download_http_file(DataDir* data_dir,
+                                                   const std::string& 
remote_file_url,
+                                                   const std::string& 
local_file_path,
+                                                   const RemoteFileStat& 
remote_filestat) {
+    auto file_size = remote_filestat.size;
+    const auto& remote_file_md5 = remote_filestat.md5;
+
+    // check disk capacity
+    if (data_dir->reach_capacity_limit(file_size)) {
+        return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
+                "reach the capacity limit of path {}, file_size={}", 
data_dir->path(), file_size);
+    }
+
+    uint64_t estimate_timeout = file_size / 
config::download_low_speed_limit_kbps / 1024;
+    if (estimate_timeout < config::download_low_speed_time) {
+        estimate_timeout = config::download_low_speed_time;
+    }
+
+    LOG(INFO) << "clone begin to download file from: " << remote_file_url
+              << " to: " << local_file_path << ". size(B): " << file_size
+              << ", timeout(s): " << estimate_timeout;
+
+    auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout, 
&local_file_path,
+                        file_size](HttpClient* client) {
+        RETURN_IF_ERROR(client->init(remote_file_url));
+        client->set_timeout_ms(estimate_timeout * 1000);
+        RETURN_IF_ERROR(client->download(local_file_path));
+
+        std::error_code ec;
+        // Check file length
+        uint64_t local_file_size = std::filesystem::file_size(local_file_path, 
ec);
+        if (ec) {
+            LOG(WARNING) << "download file error" << ec.message();
+            return Status::IOError("can't retrive file_size of {}, due to {}", 
local_file_path,
+                                   ec.message());
+        }
+        if (local_file_size != file_size) {
+            LOG(WARNING) << "download file length error"
+                         << ", remote_path=" << remote_file_url << ", 
file_size=" << file_size
+                         << ", local_file_size=" << local_file_size;
+            return Status::InternalError("downloaded file size is not equal");
+        }
+
+        if (!remote_file_md5.empty()) { // keep compatibility
+            std::string local_file_md5;
+            RETURN_IF_ERROR(
+                    io::global_local_filesystem()->md5sum(local_file_path, 
&local_file_md5));
+            if (local_file_md5 != remote_file_md5) {
+                LOG(WARNING) << "download file md5 error"
+                             << ", remote_file_url=" << remote_file_url
+                             << ", local_file_path=" << local_file_path
+                             << ", remote_file_md5=" << remote_file_md5
+                             << ", local_file_md5=" << local_file_md5;
+                return Status::RuntimeError(
+                        "download file {} md5 is not equal, local={}, 
remote={}", remote_file_url,
+                        local_file_md5, remote_file_md5);
+            }
+        }
+
+        return io::global_local_filesystem()->permission(local_file_path,
+                                                         
io::LocalFileSystem::PERMS_OWNER_RW);
+    };
+    auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, 
download_cb);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to download file from " << remote_file_url
+                     << ", status: " << status.to_string();
+        return status;
+    }
+
+    return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_load_existing_files() {
+    std::vector<std::string> existing_files;
+    
RETURN_IF_ERROR(_snapshot_loader._get_existing_files_from_local(_local_path, 
&existing_files));
+    for (auto& local_file : existing_files) {
+        // add file size
+        std::string local_file_path = _local_path + "/" + local_file;
+        std::error_code ec;
+        uint64_t local_file_size = std::filesystem::file_size(local_file_path, 
ec);
+        if (ec) {
+            LOG(WARNING) << "download file error, can't retrive file_size of " 
<< local_file_path
+                         << ", due to " << ec.message();
+            return Status::IOError("can't retrive file_size of {}, due to {}", 
local_file_path,
+                                   ec.message());
+        }
+
+        // get md5sum
+        std::string md5;
+        if (config::enable_download_md5sum_check) {
+            auto status = 
io::global_local_filesystem()->md5sum(local_file_path, &md5);
+            if (!status.ok()) {
+                LOG(WARNING) << "download file error, local file " << 
local_file_path
+                             << " md5sum: " << status.to_string();
+                return status;
+            }
+        }
+        _local_files[local_file] = {local_file_size, md5};
+
+        // get hdr file
+        if (local_file.ends_with(".hdr")) {
+            _local_hdr_filename = local_file;
+        }
+    }
+
+    return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_list_remote_files() {
+    // get all these use http download action
+    // 
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
+    std::string remote_url_prefix = fmt::format("{}&file={}", _base_url, 
_remote_path);
+
+    LOG(INFO) << "list remote files: " << remote_url_prefix << ", job: " << 
_snapshot_loader._job_id
+              << ", task id: " << _snapshot_loader._task_id << ", remote be: " 
<< _remote_be_addr;
+
+    std::string remote_file_list_str;
+    auto list_files_cb = [&remote_url_prefix, 
&remote_file_list_str](HttpClient* client) {
+        RETURN_IF_ERROR(client->init(remote_url_prefix));
+        client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
+        return client->execute(&remote_file_list_str);
+    };
+    RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, 
list_files_cb));
+
+    _remote_file_list = strings::Split(remote_file_list_str, "\n", 
strings::SkipWhitespace());
+
+    // find hdr file
+    auto hdr_file =
+            std::find_if(_remote_file_list.begin(), _remote_file_list.end(),
+                         [](const std::string& filename) { return 
_end_with(filename, ".hdr"); });
+    if (hdr_file == _remote_file_list.end()) {
+        std::string msg =
+                fmt::format("can't find hdr file in remote snapshot path: {}", 
_remote_path);
+        LOG(WARNING) << msg;
+        return Status::RuntimeError(std::move(msg));
+    }
+    _remote_hdr_filename = *hdr_file;
+    _remote_file_list.erase(hdr_file);
+
+    return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_download_hdr_file() {
+    RemoteFileStat remote_hdr_stat;
+    std::string remote_hdr_file_url =
+            fmt::format("{}&file={}/{}", _base_url, _remote_path, 
_remote_hdr_filename);
+    auto status = _get_http_file_stat(remote_hdr_file_url, &remote_hdr_stat);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to get remote hdr file stat: " << 
remote_hdr_file_url
+                     << ", error: " << status.to_string();
+        return status;
+    }
+
+    std::string hdr_filename = _remote_hdr_filename + ".tmp";
+    std::string hdr_file = _local_path + "/" + hdr_filename;
+    status = _download_http_file(_tablet->data_dir(), remote_hdr_file_url, 
hdr_file,
+                                 remote_hdr_stat);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to download remote hdr file: " << 
remote_hdr_file_url
+                     << ", error: " << status.to_string();
+        return status;
+    }
+    _tmp_hdr_file = hdr_file;
+    _remote_hdr_filestat = remote_hdr_stat;
+    return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_link_same_rowset_files() {
+    std::string local_hdr_file_path = _local_path + "/" + _local_hdr_filename;
+
+    // load local tablet meta
+    TabletMetaPB local_tablet_meta;
+    auto status = TabletMeta::load_from_file(local_hdr_file_path, 
&local_tablet_meta);
+    if (!status.ok()) {
+        // This file might broken because of the partial downloading.
+        LOG(WARNING) << "failed to load local tablet meta: " << 
local_hdr_file_path
+                     << ", skip link same rowset files, error: " << 
status.to_string();
+        return Status::OK();
+    }
+
+    // load remote tablet meta
+    TabletMetaPB remote_tablet_meta;
+    status = TabletMeta::load_from_file(_tmp_hdr_file, &remote_tablet_meta);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to load remote tablet meta: " << _tmp_hdr_file
+                     << ", error: " << status.to_string();
+        return status;
+    }
+
+    LOG(INFO) << "link rowset files by compare " << _local_hdr_filename << " 
and "
+              << _remote_hdr_filename;
+
+    std::unordered_map<std::string, const RowsetMetaPB&> remote_rowset_metas;
+    for (const auto& rowset_meta : remote_tablet_meta.rs_metas()) {
+        if (rowset_meta.has_resource_id()) { // skip remote rowset
+            continue;
+        }
+        remote_rowset_metas.insert({rowset_meta.rowset_id_v2(), rowset_meta});
+    }
+
+    for (const auto& local_rowset_meta : local_tablet_meta.rs_metas()) {
+        if (local_rowset_meta.has_resource_id() || 
!local_rowset_meta.has_source_rowset_id()) {
+            continue;
+        }
+
+        auto remote_rowset_meta = 
remote_rowset_metas.find(local_rowset_meta.source_rowset_id());
+        if (remote_rowset_meta == remote_rowset_metas.end()) {
+            continue;
+        }
+
+        const auto& remote_rowset_id = remote_rowset_meta->first;
+        const auto& remote_rowset_meta_pb = remote_rowset_meta->second;
+        const auto& local_rowset_id = local_rowset_meta.rowset_id_v2();
+        auto remote_tablet_id = remote_rowset_meta_pb.tablet_id();
+        if (local_rowset_meta.start_version() != 
remote_rowset_meta_pb.start_version() ||
+            local_rowset_meta.end_version() != 
remote_rowset_meta_pb.end_version()) {
+            continue;
+        }
+
+        LOG(INFO) << "rowset " << local_rowset_id << " was downloaded from 
remote tablet "
+                  << remote_tablet_id << " rowset " << remote_rowset_id
+                  << ", directly link files instead of downloading";
+
+        // Since the rowset meta are the same, we can link the local rowset 
files as
+        // the downloaded remote rowset files.
+        for (const auto& [local_file, local_filestat] : _local_files) {
+            if (!local_file.starts_with(local_rowset_id)) {
+                continue;
+            }
+
+            std::string remote_file = local_file;
+            remote_file.replace(0, local_rowset_id.size(), remote_rowset_id);
+            std::string local_file_path = _local_path + "/" + local_file;
+            std::string remote_file_path = _local_path + "/" + remote_file;
+
+            bool exist = true;
+            
RETURN_IF_ERROR(io::global_local_filesystem()->exists(remote_file_path, 
&exist));
+            if (exist) {
+                continue;
+            }
+
+            LOG(INFO) << "link file from " << local_file_path << " to " << 
remote_file_path;
+            if (!io::global_local_filesystem()->link_file(local_file_path, 
remote_file_path)) {
+                std::string msg = fmt::format("link file failed from {} to {}, 
err: {}",
+                                              local_file_path, 
remote_file_path, strerror(errno));
+                LOG(WARNING) << msg;
+                return Status::InternalError(std::move(msg));
+            }
+
+            _local_files[remote_file] = local_filestat;
+        }
+    }
+
+    return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_get_remote_file_stats() {
+    for (const auto& filename : _remote_file_list) {
+        if (_report_progress_callback) {
+            RETURN_IF_ERROR(_report_progress_callback());
+        }
+
+        std::string remote_file_url =
+                fmt::format("{}&file={}/{}", _base_url, _remote_path, 
filename);
+
+        RemoteFileStat remote_filestat;
+        RETURN_IF_ERROR(_get_http_file_stat(remote_file_url, 
&remote_filestat));
+        _remote_files[filename] = remote_filestat;
+    }
+
+    return Status::OK();
+}
+
+void SnapshotHttpDownloader::_get_need_download_files() {
+    for (const auto& [remote_file, remote_filestat] : _remote_files) {
+        LOG(INFO) << "remote file: " << remote_file << ", size: " << 
remote_filestat.size
+                  << ", md5: " << remote_filestat.md5;
+        auto it = _local_files.find(remote_file);
+        if (it == _local_files.end()) {
+            _need_download_files.emplace_back(remote_file);
+            continue;
+        }
+
+        if (auto& local_filestat = it->second; local_filestat.size != 
remote_filestat.size) {
+            _need_download_files.emplace_back(remote_file);
+            continue;
+        }
+
+        if (auto& local_filestat = it->second; local_filestat.md5 != 
remote_filestat.md5) {
+            _need_download_files.emplace_back(remote_file);
+            continue;
+        }
+
+        LOG(INFO) << fmt::format("file {} already exists, skip download url 
{}", remote_file,
+                                 remote_filestat.url);
+    }
+}
+
+Status SnapshotHttpDownloader::_download_files() {
+    DataDir* data_dir = _tablet->data_dir();
+
+    uint64_t total_file_size = 0;
+    MonotonicStopWatch watch(true);
+    for (auto& filename : _need_download_files) {
+        if (_report_progress_callback) {
+            RETURN_IF_ERROR(_report_progress_callback());
+        }
+
+        auto& remote_filestat = _remote_files[filename];
+        auto file_size = remote_filestat.size;
+        auto& remote_file_url = remote_filestat.url;
+        auto& remote_file_md5 = remote_filestat.md5;
+
+        std::string local_filename;
+        RETURN_IF_ERROR(
+                _snapshot_loader._replace_tablet_id(filename, 
_local_tablet_id, &local_filename));
+        std::string local_file_path = _local_path + "/" + local_filename;
+
+        RETURN_IF_ERROR(
+                _download_http_file(data_dir, remote_file_url, 
local_file_path, remote_filestat));
+        total_file_size += file_size;
+
+        // local_files always keep the updated local files
+        _local_files[filename] = LocalFileStat {file_size, remote_file_md5};
+    }
+
+    uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
+    total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
+    double copy_rate = 0.0;
+    if (total_time_ms > 0) {
+        copy_rate = total_file_size / ((double)total_time_ms) / 1000;
+    }
+    LOG(INFO) << fmt::format(
+            "succeed to copy remote tablet {} to local tablet {}, total 
downloading {} files, "
+            "total file size: {} B, cost: {} ms, rate: {} MB/s",
+            _remote_tablet_id, _local_tablet_id, _need_download_files.size(), 
total_file_size,
+            total_time_ms, copy_rate);
+
+    return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_install_remote_hdr_file() {
+    std::string local_hdr_filename;
+    RETURN_IF_ERROR(_snapshot_loader._replace_tablet_id(_remote_hdr_filename, 
_local_tablet_id,
+                                                        &local_hdr_filename));
+    std::string local_hdr_file_path = _local_path + "/" + local_hdr_filename;
+
+    auto status = io::global_local_filesystem()->rename(_tmp_hdr_file, 
local_hdr_file_path);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to install remote hdr file from: " << 
_tmp_hdr_file << " to"
+                     << local_hdr_file_path << ", error: " << 
status.to_string();
+        return Status::RuntimeError("failed install remote hdr file {} from 
tmp {}, error: {}",
+                                    local_hdr_file_path, _tmp_hdr_file, 
status.to_string());
+    }
+
+    // also save the hdr file into remote files.
+    _remote_files[_remote_hdr_filename] = _remote_hdr_filestat;
+
+    return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_delete_orphan_files() {
+    // local_files: contain all remote files and local files
+    // finally, delete local files which are not in remote
+    for (const auto& [local_file, local_filestat] : _local_files) {
+        // replace the tablet id in local file name with the remote tablet id,
+        // in order to compare the file name.
+        std::string new_name;
+        Status st = _snapshot_loader._replace_tablet_id(local_file, 
_remote_tablet_id, &new_name);
+        if (!st.ok()) {
+            LOG(WARNING) << "failed to replace tablet id. unknown local file: 
" << st
+                         << ". ignore it";
+            continue;
+        }
+        VLOG_CRITICAL << "new file name after replace tablet id: " << new_name;
+        const auto& find = _remote_files.find(new_name);
+        if (find != _remote_files.end()) {
+            continue;
+        }
+
+        // delete
+        std::string full_local_file = _local_path + "/" + local_file;
+        LOG(INFO) << "begin to delete local snapshot file: " << full_local_file
+                  << ", it does not exist in remote";
+        if (remove(full_local_file.c_str()) != 0) {
+            LOG(WARNING) << "failed to delete unknown local file: " << 
full_local_file
+                         << ", error: " << strerror(errno) << ", file size: " 
<< local_filestat.size
+                         << ", ignore it";
+        }
+    }
+    return Status::OK();
+}
+
+Status SnapshotHttpDownloader::download() {
+    // Take a lock to protect the local snapshot path.
+    auto local_snapshot_guard = 
LocalSnapshotLock::instance().acquire(_local_path);
+
+    // Step 1: Validate local tablet snapshot paths
+    bool res = true;
+    RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(_local_path, 
&res));
+    if (!res) {
+        std::string msg =
+                fmt::format("snapshot path is not directory or does not exist: 
{}", _local_path);
+        LOG(WARNING) << msg;
+        return Status::RuntimeError(std::move(msg));
+    }
+
+    // Step 2: get all local files
+    RETURN_IF_ERROR(_load_existing_files());
+
+    // Step 3: Validate remote tablet snapshot paths && remote files map
+    RETURN_IF_ERROR(_list_remote_files());
+
+    // Step 4: download hdr file to a tmp file
+    RETURN_IF_ERROR(_download_hdr_file());
+
+    // Step 5: link same rowset files, if local tablet meta file exists
+    if (!_local_hdr_filename.empty()) {
+        RETURN_IF_ERROR(_link_same_rowset_files());
+    }
+
+    // Step 6: get all remote file stats
+    RETURN_IF_ERROR(_get_remote_file_stats());
+
+    // Step 7: get all need download files & download them
+    _get_need_download_files();
+    if (!_need_download_files.empty()) {
+        RETURN_IF_ERROR(_download_files());
+    }
+
+    // Step 8: install remote hdr file from tmp file
+    RETURN_IF_ERROR(_install_remote_hdr_file());
+
+    // Step 9: delete orphan files
+    RETURN_IF_ERROR(_delete_orphan_files());
+
+    return Status::OK();
+}
+
 SnapshotLoader::SnapshotLoader(StorageEngine& engine, ExecEnv* env, int64_t 
job_id, int64_t task_id,
                                const TNetworkAddress& broker_addr,
                                const std::map<std::string, std::string>& prop)
@@ -413,11 +983,6 @@ Status SnapshotLoader::download(const 
std::map<std::string, std::string>& src_to
 Status SnapshotLoader::remote_http_download(
         const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
         std::vector<int64_t>* downloaded_tablet_ids) {
-    LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, 
task id: {}", _job_id,
-                             _task_id);
-
-    constexpr uint32_t kDownloadFileMaxRetry = 3;
-
     // check if job has already been cancelled
     int tmp_counter = 1;
     RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, 
TTaskType::type::DOWNLOAD));
@@ -427,281 +992,28 @@ Status SnapshotLoader::remote_http_download(
     int finished_num = 0;
     int total_num = remote_tablet_snapshots.size();
     for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        auto local_tablet_id = remote_tablet_snapshot.local_tablet_id;
         const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
         const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+
         LOG(INFO) << fmt::format(
                 "download snapshots via http. job: {}, task id: {}, local dir: 
{}, remote dir: {}",
                 _job_id, _task_id, local_path, remote_path);
 
-        // Take a lock to protect the local snapshot path.
-        auto local_snapshot_guard = 
LocalSnapshotLock::instance().acquire(local_path);
-
-        // Step 1: Validate local tablet snapshot paths
-        bool res = true;
-        
RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(local_path, &res));
-        if (!res) {
-            std::stringstream ss;
-            auto err_msg =
-                    fmt::format("snapshot path is not directory or does not 
exist: {}", local_path);
-            LOG(WARNING) << err_msg;
-            return Status::RuntimeError(err_msg);
-        }
-
-        // Step 2: get all local files
-        struct LocalFileStat {
-            uint64_t size;
-            std::string md5;
-        };
-        std::unordered_map<std::string, LocalFileStat> local_files;
-        std::vector<std::string> existing_files;
-        RETURN_IF_ERROR(_get_existing_files_from_local(local_path, 
&existing_files));
-        for (auto& local_file : existing_files) {
-            // add file size
-            std::string local_file_path = local_path + "/" + local_file;
-            std::error_code ec;
-            uint64_t local_file_size = 
std::filesystem::file_size(local_file_path, ec);
-            if (ec) {
-                LOG(WARNING) << "download file error" << ec.message();
-                return Status::IOError("can't retrive file_size of {}, due to 
{}", local_file_path,
-                                       ec.message());
-            }
-            std::string md5;
-            auto status = 
io::global_local_filesystem()->md5sum(local_file_path, &md5);
-            if (!status.ok()) {
-                LOG(WARNING) << "download file error, local file " << 
local_file_path
-                             << " md5sum: " << status.to_string();
-                return status;
-            }
-            local_files[local_file] = {local_file_size, md5};
-        }
-        existing_files.clear();
-
-        // Step 3: Validate remote tablet snapshot paths && remote files map
-        // key is remote snapshot paths, value is filelist
-        // get all these use http download action
-        // 
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
-        struct RemoteFileStat {
-            std::string url;
-            std::string md5;
-            uint64_t size;
-        };
-        std::unordered_map<std::string, RemoteFileStat> remote_files;
-        const auto& token = remote_tablet_snapshot.remote_token;
-        const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
-
-        // HEAD 
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
-        std::string base_url = 
fmt::format("http://{}:{}/api/_tablet/_download?token={}";,
-                                           remote_be_addr.hostname, 
remote_be_addr.port, token);
-        std::string remote_url_prefix = fmt::format("{}&file={}", base_url, 
remote_path);
-
-        LOG(INFO) << "list remote files: " << remote_url_prefix << ", job: " 
<< _job_id
-                  << ", task id: " << _task_id << ", remote be: " << 
remote_be_addr;
-        string file_list_str;
-        auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* 
client) {
-            RETURN_IF_ERROR(client->init(remote_url_prefix));
-            client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
-            return client->execute(&file_list_str);
-        };
-        RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 
1, list_files_cb));
-        std::vector<string> filename_list =
-                strings::Split(file_list_str, "\n", strings::SkipWhitespace());
-
-        for (const auto& filename : filename_list) {
-            std::string remote_file_url =
-                    fmt::format("{}&file={}/{}&channel=ingest_binlog", 
base_url,
-                                remote_tablet_snapshot.remote_snapshot_path, 
filename);
-
-            // get file length
-            uint64_t file_size = 0;
-            std::string file_md5;
-            auto get_file_stat_cb = [&remote_file_url, &file_size, 
&file_md5](HttpClient* client) {
-                int64_t timeout_ms = config::download_binlog_meta_timeout_ms;
-                std::string url = remote_file_url;
-                if (config::enable_download_md5sum_check) {
-                    // compute md5sum is time-consuming, so we set a longer 
timeout
-                    timeout_ms = config::download_binlog_meta_timeout_ms * 3;
-                    url = fmt::format("{}&acquire_md5=true", remote_file_url);
-                }
-                RETURN_IF_ERROR(client->init(url));
-                client->set_timeout_ms(timeout_ms);
-                RETURN_IF_ERROR(client->head());
-                RETURN_IF_ERROR(client->get_content_length(&file_size));
-                if (config::enable_download_md5sum_check) {
-                    RETURN_IF_ERROR(client->get_content_md5(&file_md5));
-                }
-                return Status::OK();
-            };
-            RETURN_IF_ERROR(
-                    HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, 
get_file_stat_cb));
-
-            remote_files[filename] = RemoteFileStat {remote_file_url, 
file_md5, file_size};
-        }
-
-        // Step 4: Compare local and remote files && get all need download 
files
-        RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, 
total_num,
-                                      TTaskType::type::DOWNLOAD));
-
-        // get all need download files
-        std::vector<std::string> need_download_files;
-        for (const auto& [remote_file, remote_filestat] : remote_files) {
-            LOG(INFO) << "remote file: " << remote_file << ", size: " << 
remote_filestat.size
-                      << ", md5: " << remote_filestat.md5;
-            auto it = local_files.find(remote_file);
-            if (it == local_files.end()) {
-                need_download_files.emplace_back(remote_file);
-                continue;
-            }
-            if (_end_with(remote_file, ".hdr")) {
-                need_download_files.emplace_back(remote_file);
-                continue;
-            }
-
-            if (auto& local_filestat = it->second; local_filestat.size != 
remote_filestat.size) {
-                need_download_files.emplace_back(remote_file);
-                continue;
-            }
-
-            if (auto& local_filestat = it->second; local_filestat.md5 != 
remote_filestat.md5) {
-                need_download_files.emplace_back(remote_file);
-                continue;
-            }
-
-            LOG(INFO) << fmt::format("file {} already exists, skip download", 
remote_file);
-        }
-
-        auto local_tablet_id = remote_tablet_snapshot.local_tablet_id;
         TabletSharedPtr tablet = 
_engine.tablet_manager()->get_tablet(local_tablet_id);
         if (tablet == nullptr) {
-            std::stringstream ss;
-            ss << "failed to get local tablet: " << local_tablet_id;
-            LOG(WARNING) << ss.str();
-            return Status::InternalError(ss.str());
+            std::string msg = fmt::format("failed to get local tablet: {}", 
local_tablet_id);
+            LOG(WARNING) << msg;
+            return Status::RuntimeError(std::move(msg));
         }
-        DataDir* data_dir = tablet->data_dir();
-
-        // download all need download files
-        uint64_t total_file_size = 0;
-        MonotonicStopWatch watch;
-        watch.start();
-        for (auto& filename : need_download_files) {
-            auto& remote_filestat = remote_files[filename];
-            auto file_size = remote_filestat.size;
-            auto& remote_file_url = remote_filestat.url;
-            auto& remote_file_md5 = remote_filestat.md5;
 
-            // check disk capacity
-            if (data_dir->reach_capacity_limit(file_size)) {
-                return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
-                        "reach the capacity limit of path {}, file_size={}", 
data_dir->path(),
-                        file_size);
-            }
-
-            total_file_size += file_size;
-            uint64_t estimate_timeout = file_size / 
config::download_low_speed_limit_kbps / 1024;
-            if (estimate_timeout < config::download_low_speed_time) {
-                estimate_timeout = config::download_low_speed_time;
-            }
-
-            std::string local_filename;
-            RETURN_IF_ERROR(_replace_tablet_id(filename, local_tablet_id, 
&local_filename));
-            std::string local_file_path = local_path + "/" + local_filename;
-
-            LOG(INFO) << "clone begin to download file from: " << 
remote_file_url
-                      << " to: " << local_file_path << ". size(B): " << 
file_size
-                      << ", timeout(s): " << estimate_timeout;
-
-            auto download_cb = [&remote_file_url, &remote_file_md5, 
estimate_timeout,
-                                &local_file_path, file_size](HttpClient* 
client) {
-                RETURN_IF_ERROR(client->init(remote_file_url));
-                client->set_timeout_ms(estimate_timeout * 1000);
-                RETURN_IF_ERROR(client->download(local_file_path));
-
-                std::error_code ec;
-                // Check file length
-                uint64_t local_file_size = 
std::filesystem::file_size(local_file_path, ec);
-                if (ec) {
-                    LOG(WARNING) << "download file error" << ec.message();
-                    return Status::IOError("can't retrive file_size of {}, due 
to {}",
-                                           local_file_path, ec.message());
-                }
-                if (local_file_size != file_size) {
-                    LOG(WARNING) << "download file length error"
-                                 << ", remote_path=" << remote_file_url
-                                 << ", file_size=" << file_size
-                                 << ", local_file_size=" << local_file_size;
-                    return Status::InternalError("downloaded file size is not 
equal");
-                }
-
-                if (!remote_file_md5.empty()) { // keep compatibility
-                    std::string local_file_md5;
-                    
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_file_path,
-                                                                          
&local_file_md5));
-                    if (local_file_md5 != remote_file_md5) {
-                        LOG(WARNING) << "download file md5 error"
-                                     << ", remote_file_url=" << remote_file_url
-                                     << ", local_file_path=" << local_file_path
-                                     << ", remote_file_md5=" << remote_file_md5
-                                     << ", local_file_md5=" << local_file_md5;
-                        return Status::RuntimeError(
-                                "download file {} md5 is not equal, local={}, 
remote={}",
-                                remote_file_url, local_file_md5, 
remote_file_md5);
-                    }
-                }
-
-                return io::global_local_filesystem()->permission(
-                        local_file_path, io::LocalFileSystem::PERMS_OWNER_RW);
-            };
-            auto status = 
HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb);
-            if (!status.ok()) {
-                LOG(WARNING) << "failed to download file from " << 
remote_file_url
-                             << ", status: " << status.to_string();
-                return status;
-            }
-
-            // local_files always keep the updated local files
-            local_files[filename] = LocalFileStat {file_size, remote_file_md5};
-        }
-
-        uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
-        total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
-        double copy_rate = 0.0;
-        if (total_time_ms > 0) {
-            copy_rate = total_file_size / ((double)total_time_ms) / 1000;
-        }
-        auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
-        LOG(INFO) << fmt::format(
-                "succeed to copy remote tablet {} to local tablet {}, total 
file size: {} B, cost: "
-                "{} ms, rate: {} MB/s",
-                remote_tablet_id, local_tablet_id, total_file_size, 
total_time_ms, copy_rate);
-
-        // local_files: contain all remote files and local files
-        // finally, delete local files which are not in remote
-        for (const auto& [local_file, local_filestat] : local_files) {
-            // replace the tablet id in local file name with the remote tablet 
id,
-            // in order to compare the file name.
-            std::string new_name;
-            Status st = _replace_tablet_id(local_file, remote_tablet_id, 
&new_name);
-            if (!st.ok()) {
-                LOG(WARNING) << "failed to replace tablet id. unknown local 
file: " << st
-                             << ". ignore it";
-                continue;
-            }
-            VLOG_CRITICAL << "new file name after replace tablet id: " << 
new_name;
-            const auto& find = remote_files.find(new_name);
-            if (find != remote_files.end()) {
-                continue;
-            }
-
-            // delete
-            std::string full_local_file = local_path + "/" + local_file;
-            LOG(INFO) << "begin to delete local snapshot file: " << 
full_local_file
-                      << ", it does not exist in remote";
-            if (remove(full_local_file.c_str()) != 0) {
-                LOG(WARNING) << "failed to delete unknown local file: " << 
full_local_file
-                             << ", error: " << strerror(errno)
-                             << ", file size: " << local_filestat.size << ", 
ignore it";
-            }
-        }
+        SnapshotHttpDownloader downloader(remote_tablet_snapshot, 
std::move(tablet), *this);
+        downloader.set_report_progress_callback(
+                [this, &report_counter, &finished_num, &total_num]() {
+                    return _report_every(10, &report_counter, finished_num, 
total_num,
+                                         TTaskType::type::DOWNLOAD);
+                });
+        RETURN_IF_ERROR(downloader.download());
 
         ++finished_num;
     }
@@ -874,6 +1186,25 @@ Status SnapshotLoader::move(const std::string& 
snapshot_path, TabletSharedPtr ta
     return status;
 }
 
+Status SnapshotLoader::_replace_tablet_id(const std::string& file_name, 
int64_t tablet_id,
+                                          std::string* new_file_name) {
+    // eg:
+    // 10007.hdr
+    // 10007_2_2_0_0.idx
+    // 10007_2_2_0_0.dat
+    if (_end_with(file_name, ".hdr")) {
+        std::stringstream ss;
+        ss << tablet_id << ".hdr";
+        *new_file_name = ss.str();
+        return Status::OK();
+    } else if (_end_with(file_name, ".idx") || _end_with(file_name, ".dat")) {
+        *new_file_name = file_name;
+        return Status::OK();
+    } else {
+        return Status::InternalError("invalid tablet file name: {}", 
file_name);
+    }
+}
+
 Status SnapshotLoader::_get_tablet_id_and_schema_hash_from_file_path(const 
std::string& src_path,
                                                                      int64_t* 
tablet_id,
                                                                      int32_t* 
schema_hash) {
@@ -941,25 +1272,6 @@ Status 
SnapshotLoader::_get_existing_files_from_local(const std::string& local_p
     return Status::OK();
 }
 
-Status SnapshotLoader::_replace_tablet_id(const std::string& file_name, 
int64_t tablet_id,
-                                          std::string* new_file_name) {
-    // eg:
-    // 10007.hdr
-    // 10007_2_2_0_0.idx
-    // 10007_2_2_0_0.dat
-    if (_end_with(file_name, ".hdr")) {
-        std::stringstream ss;
-        ss << tablet_id << ".hdr";
-        *new_file_name = ss.str();
-        return Status::OK();
-    } else if (_end_with(file_name, ".idx") || _end_with(file_name, ".dat")) {
-        *new_file_name = file_name;
-        return Status::OK();
-    } else {
-        return Status::InternalError("invalid tablet file name: {}", 
file_name);
-    }
-}
-
 Status SnapshotLoader::_get_tablet_id_from_remote_path(const std::string& 
remote_path,
                                                        int64_t* tablet_id) {
     // eg:
diff --git a/be/src/runtime/snapshot_loader.h b/be/src/runtime/snapshot_loader.h
index 7b1d5a0d942..4913af6dd86 100644
--- a/be/src/runtime/snapshot_loader.h
+++ b/be/src/runtime/snapshot_loader.h
@@ -32,6 +32,7 @@ namespace io {
 class RemoteFileSystem;
 } // namespace io
 
+class DataDir;
 class TRemoteTabletSnapshot;
 class StorageEngine;
 
@@ -63,6 +64,8 @@ class ExecEnv;
  *
  */
 class SnapshotLoader {
+    friend class SnapshotHttpDownloader;
+
 public:
     SnapshotLoader(StorageEngine& engine, ExecEnv* env, int64_t job_id, 
int64_t task_id,
                    const TNetworkAddress& broker_addr = {},
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 3513e35b541..a2443c3cc4d 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -236,6 +236,9 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* 
arg) {
         status.to_thrift(&tstatus);
         return;
     }
+    // save source rowset id and tablet id
+    rowset_meta_pb.set_source_rowset_id(remote_rowset_id);
+    rowset_meta_pb.set_source_tablet_id(request.remote_tablet_id);
     // rewrite rowset meta
     rowset_meta_pb.set_tablet_id(local_tablet_id);
     rowset_meta_pb.set_partition_id(partition_id);
@@ -494,6 +497,10 @@ void _ingest_binlog(StorageEngine& engine, 
IngestBinlogArg* arg) {
             std::string remote_file_md5;
             RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
 
+            LOG(INFO) << "download segment index file to " << 
local_segment_index_path
+                      << ", remote md5: " << remote_file_md5
+                      << ", remote size: " << segment_index_file_size;
+
             std::error_code ec;
             // Check file length
             uint64_t local_index_file_size =
@@ -546,7 +553,6 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* 
arg) {
     RowsetSharedPtr rowset;
     status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
                                           local_tablet->tablet_path(), 
rowset_meta, &rowset);
-
     if (!status) {
         LOG(WARNING) << "failed to create rowset from rowset meta for remote 
tablet"
                      << ". rowset_id: " << rowset_meta_pb.rowset_id()
diff --git a/be/src/util/stopwatch.hpp b/be/src/util/stopwatch.hpp
index 9dc3ee74152..f0f9442bcf5 100644
--- a/be/src/util/stopwatch.hpp
+++ b/be/src/util/stopwatch.hpp
@@ -33,9 +33,12 @@ namespace doris {
 template <clockid_t Clock>
 class CustomStopWatch {
 public:
-    CustomStopWatch() {
+    CustomStopWatch(bool auto_start = false) {
         _total_time = 0;
         _running = false;
+        if (auto_start) {
+            start();
+        }
     }
 
     timespec start_time() const { return _start; }
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 1b1afbb9d2b..8904ffc74e4 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -118,6 +118,9 @@ message RowsetMetaPB {
     // to indicate whether the data between the segments overlap
     optional SegmentsOverlapPB segments_overlap_pb = 51 [default = 
OVERLAP_UNKNOWN];
     optional int64 compaction_level = 52 [default = 0];
+    // For backup/restore, record the tablet id and rowset id of the source 
cluster.
+    optional int64 source_tablet_id = 53;
+    optional string source_rowset_id = 54;
 
     // For cloud
     // for data recycling
@@ -203,6 +206,10 @@ message RowsetMetaCloudPB {
     // to indicate whether the data between the segments overlap
     optional SegmentsOverlapPB segments_overlap_pb = 51 [default = 
OVERLAP_UNKNOWN];
 
+    // For backup/restore, record the tablet id and rowset id of the source 
cluster.
+    optional int64 source_tablet_id = 53;
+    optional string source_rowset_id = 54;
+
     // cloud
     // the field is a vector, rename it
     repeated int64 segments_file_size = 100;
diff --git a/regression-test/suites/ccr_syncer_p1/test_backup_restore.groovy 
b/regression-test/suites/ccr_syncer_p1/test_backup_restore.groovy
index 18c33cc72d0..c65b0aea272 100644
--- a/regression-test/suites/ccr_syncer_p1/test_backup_restore.groovy
+++ b/regression-test/suites/ccr_syncer_p1/test_backup_restore.groovy
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_backup_restore") {
+suite("test_backup_restore_ccr") {
 
     def syncer = getSyncer()
     if (!syncer.checkEnableFeatureBinlog()) {
@@ -66,4 +66,11 @@ suite("test_backup_restore") {
     target_sql " sync "
     res = target_sql "SELECT * FROM ${tableName}"
     assertEquals(res.size(), insert_num)
+
+    logger.info("=== Test 2: restore again ===")
+    assertTrue(syncer.restoreSnapshot(true))
+    syncer.waitTargetRestoreFinish()
+    target_sql " sync "
+    res = target_sql "SELECT * FROM ${tableName}"
+    assertEquals(res.size(), insert_num)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to