This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7fc4ddcb3a8 [chore](be) Acquire and check MD5 digest of the file to 
download (#37420)
7fc4ddcb3a8 is described below

commit 7fc4ddcb3a8bd39007856b62a93e7fece9d88388
Author: walter <w41te...@gmail.com>
AuthorDate: Fri Jul 26 14:03:02 2024 +0800

    [chore](be) Acquire and check MD5 digest of the file to download (#37420)
    
    Cherry-pick #35807, #36621, #36726
---
 be/src/http/action/download_action.cpp        | 21 +++---
 be/src/http/action/download_binlog_action.cpp |  9 ++-
 be/src/http/http_client.cpp                   | 46 ++++++++++++-
 be/src/http/http_client.h                     |  5 +-
 be/src/http/utils.cpp                         | 30 ++++++---
 be/src/http/utils.h                           |  3 +-
 be/src/runtime/snapshot_loader.cpp            | 89 +++++++++++++++++--------
 be/src/service/backend_service.cpp            | 56 ++++++++++++++--
 be/test/http/http_client_test.cpp             | 96 +++++++++++++++++++++++++++
 9 files changed, 297 insertions(+), 58 deletions(-)

diff --git a/be/src/http/action/download_action.cpp 
b/be/src/http/action/download_action.cpp
index 720b9d65fa3..680bcc19e70 100644
--- a/be/src/http/action/download_action.cpp
+++ b/be/src/http/action/download_action.cpp
@@ -17,9 +17,7 @@
 
 #include "http/action/download_action.h"
 
-#include <algorithm>
 #include <memory>
-#include <sstream>
 #include <string>
 #include <utility>
 
@@ -33,10 +31,11 @@
 
 namespace doris {
 namespace {
-static const std::string FILE_PARAMETER = "file";
-static const std::string TOKEN_PARAMETER = "token";
-static const std::string CHANNEL_PARAMETER = "channel";
-static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
+const std::string FILE_PARAMETER = "file";
+const std::string TOKEN_PARAMETER = "token";
+const std::string CHANNEL_PARAMETER = "channel";
+const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
+const std::string ACQUIRE_MD5_PARAMETER = "acquire_md5";
 } // namespace
 
 DownloadAction::DownloadAction(ExecEnv* exec_env,
@@ -46,7 +45,7 @@ DownloadAction::DownloadAction(ExecEnv* exec_env,
           _download_type(NORMAL),
           _num_workers(num_workers),
           _rate_limit_group(std::move(rate_limit_group)) {
-    for (auto& dir : allow_dirs) {
+    for (const auto& dir : allow_dirs) {
         std::string p;
         Status st = io::global_local_filesystem()->canonicalize(dir, &p);
         if (!st.ok()) {
@@ -114,11 +113,9 @@ void DownloadAction::handle_normal(HttpRequest* req, const 
std::string& file_par
     } else {
         const auto& channel = req->param(CHANNEL_PARAMETER);
         bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
-        if (ingest_binlog) {
-            do_file_response(file_param, req, _rate_limit_group.get());
-        } else {
-            do_file_response(file_param, req);
-        }
+        bool is_acquire_md5 = !req->param(ACQUIRE_MD5_PARAMETER).empty();
+        auto* rate_limit_group = ingest_binlog ? _rate_limit_group.get() : 
nullptr;
+        do_file_response(file_param, req, rate_limit_group, is_acquire_md5);
     }
 }
 
diff --git a/be/src/http/action/download_binlog_action.cpp 
b/be/src/http/action/download_binlog_action.cpp
index dbe2880d3b4..61d65ca9756 100644
--- a/be/src/http/action/download_binlog_action.cpp
+++ b/be/src/http/action/download_binlog_action.cpp
@@ -48,6 +48,7 @@ const std::string kBinlogVersionParameter = "binlog_version";
 const std::string kRowsetIdParameter = "rowset_id";
 const std::string kSegmentIndexParameter = "segment_index";
 const std::string kSegmentIndexIdParameter = "segment_index_id";
+const std::string kAcquireMD5Parameter = "acquire_md5";
 
 // get http param, if no value throw exception
 const auto& get_http_param(HttpRequest* req, const std::string& param_name) {
@@ -102,12 +103,14 @@ void handle_get_binlog_info(HttpRequest* req) {
 void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* 
rate_limit_group) {
     // Step 1: get download file path
     std::string segment_file_path;
+    bool is_acquire_md5 = false;
     try {
         const auto& tablet_id = get_http_param(req, kTabletIdParameter);
         auto tablet = get_tablet(tablet_id);
         const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
         const auto& segment_index = get_http_param(req, 
kSegmentIndexParameter);
         segment_file_path = tablet->get_segment_filepath(rowset_id, 
segment_index);
+        is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
     } catch (const std::exception& e) {
         HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, 
e.what());
         LOG(WARNING) << "get download file path failed, error: " << e.what();
@@ -128,7 +131,7 @@ void handle_get_segment_file(HttpRequest* req, 
bufferevent_rate_limit_group* rat
         LOG(WARNING) << "file not exist, file path: " << segment_file_path;
         return;
     }
-    do_file_response(segment_file_path, req, rate_limit_group);
+    do_file_response(segment_file_path, req, rate_limit_group, is_acquire_md5);
 }
 
 /// handle get segment index file, need tablet_id, rowset_id, segment_index && 
segment_index_id
@@ -136,6 +139,7 @@ void handle_get_segment_index_file(HttpRequest* req,
                                    bufferevent_rate_limit_group* 
rate_limit_group) {
     // Step 1: get download file path
     std::string segment_index_file_path;
+    bool is_acquire_md5 = false;
     try {
         const auto& tablet_id = get_http_param(req, kTabletIdParameter);
         auto tablet = get_tablet(tablet_id);
@@ -144,6 +148,7 @@ void handle_get_segment_index_file(HttpRequest* req,
         const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
         segment_index_file_path =
                 tablet->get_segment_index_filepath(rowset_id, segment_index, 
segment_index_id);
+        is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
     } catch (const std::exception& e) {
         HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, 
e.what());
         LOG(WARNING) << "get download file path failed, error: " << e.what();
@@ -164,7 +169,7 @@ void handle_get_segment_index_file(HttpRequest* req,
         LOG(WARNING) << "file not exist, file path: " << 
segment_index_file_path;
         return;
     }
-    do_file_response(segment_index_file_path, req, rate_limit_group);
+    do_file_response(segment_index_file_path, req, rate_limit_group, 
is_acquire_md5);
 }
 
 void handle_get_rowset_meta(HttpRequest* req) {
diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp
index 3fe52bf4912..218802878bd 100644
--- a/be/src/http/http_client.cpp
+++ b/be/src/http/http_client.cpp
@@ -24,12 +24,36 @@
 #include <ostream>
 
 #include "common/config.h"
+#include "http/http_headers.h"
 #include "http/http_status.h"
 #include "util/stack_util.h"
 
 namespace doris {
 
-HttpClient::HttpClient() {}
+static const char* header_error_msg(CURLHcode code) {
+    switch (code) {
+    case CURLHE_OK:
+        return "OK";
+    case CURLHE_BADINDEX:
+        return "header exists but not with this index ";
+    case CURLHE_MISSING:
+        return "no such header exists";
+    case CURLHE_NOHEADERS:
+        return "no headers at all exist (yet)";
+    case CURLHE_NOREQUEST:
+        return "no request with this number was used";
+    case CURLHE_OUT_OF_MEMORY:
+        return "out of memory while processing";
+    case CURLHE_BAD_ARGUMENT:
+        return "a function argument was not okay";
+    case CURLHE_NOT_BUILT_IN:
+        return "curl_easy_header() was disabled in the build";
+    default:
+        return "unknown";
+    }
+}
+
+HttpClient::HttpClient() = default;
 
 HttpClient::~HttpClient() {
     if (_curl != nullptr) {
@@ -92,7 +116,7 @@ Status HttpClient::init(const std::string& url, bool 
set_fail_on_error) {
     }
 
     curl_write_callback callback = [](char* buffer, size_t size, size_t nmemb, 
void* param) {
-        HttpClient* client = (HttpClient*)param;
+        auto* client = (HttpClient*)param;
         return client->on_response_data(buffer, size * nmemb);
     };
 
@@ -181,6 +205,24 @@ Status HttpClient::execute(const std::function<bool(const 
void* data, size_t len
     return Status::OK();
 }
 
+Status HttpClient::get_content_md5(std::string* md5) const {
+    struct curl_header* header_ptr;
+    auto code = curl_easy_header(_curl, HttpHeaders::CONTENT_MD5, 0, 
CURLH_HEADER, 0, &header_ptr);
+    if (code == CURLHE_MISSING || code == CURLHE_NOHEADERS) {
+        // no such headers exists
+        md5->clear();
+        return Status::OK();
+    } else if (code != CURLHE_OK) {
+        auto msg = fmt::format("failed to get http header {}: {} ({})", 
HttpHeaders::CONTENT_MD5,
+                               header_error_msg(code), code);
+        LOG(WARNING) << msg << ", trace=" << get_stack_trace();
+        return Status::HttpError(std::move(msg));
+    }
+
+    *md5 = header_ptr->value;
+    return Status::OK();
+}
+
 Status HttpClient::download(const std::string& local_path) {
     // set method to GET
     set_method(GET);
diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h
index d1b2e1b47af..a6ab49c1e8a 100644
--- a/be/src/http/http_client.h
+++ b/be/src/http/http_client.h
@@ -106,7 +106,7 @@ public:
             if (cl < 0) {
                 return Status::InternalError(
                         fmt::format("failed to get content length, it should 
be a positive value, "
-                                    "actrual is : {}",
+                                    "actual is : {}",
                                     cl));
             }
             *length = cl;
@@ -115,6 +115,9 @@ public:
         return Status::InternalError("failed to get content length. err code: 
{}", code);
     }
 
+    // Get the value of the header CONTENT-MD5. The output is empty if no such 
header exists.
+    Status get_content_md5(std::string* md5) const;
+
     long get_http_status() const {
         long code;
         curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &code);
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index 3dd0b839022..93a3d92b32e 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -19,6 +19,7 @@
 
 #include <fcntl.h>
 #include <stdint.h>
+#include <sys/mman.h>
 #include <sys/stat.h>
 #include <unistd.h>
 
@@ -36,6 +37,7 @@
 #include "http/http_status.h"
 #include "io/fs/file_system.h"
 #include "io/fs/local_file_system.h"
+#include "util/md5.h"
 #include "util/path_util.h"
 #include "util/url_coding.h"
 
@@ -51,7 +53,7 @@ std::string encode_basic_auth(const std::string& user, const 
std::string& passwd
 
 bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* 
passwd) {
     const char k_basic[] = "Basic ";
-    auto& auth = req.header(HttpHeaders::AUTHORIZATION);
+    const auto& auth = req.header(HttpHeaders::AUTHORIZATION);
     if (auth.compare(0, sizeof(k_basic) - 1, k_basic, sizeof(k_basic) - 1) != 
0) {
         return false;
     }
@@ -103,25 +105,24 @@ std::string get_content_type(const std::string& 
file_name) {
     std::string file_ext = path_util::file_extension(file_name);
     VLOG_TRACE << "file_name: " << file_name << "; file extension: [" << 
file_ext << "]";
     if (file_ext == std::string(".html") || file_ext == std::string(".htm")) {
-        return std::string("text/html; charset=utf-8");
+        return "text/html; charset=utf-8";
     } else if (file_ext == std::string(".js")) {
-        return std::string("application/javascript; charset=utf-8");
+        return "application/javascript; charset=utf-8";
     } else if (file_ext == std::string(".css")) {
-        return std::string("text/css; charset=utf-8");
+        return "text/css; charset=utf-8";
     } else if (file_ext == std::string(".txt")) {
-        return std::string("text/plain; charset=utf-8");
+        return "text/plain; charset=utf-8";
     } else if (file_ext == std::string(".png")) {
-        return std::string("image/png");
+        return "image/png";
     } else if (file_ext == std::string(".ico")) {
-        return std::string("image/x-icon");
+        return "image/x-icon";
     } else {
         return "text/plain; charset=utf-8";
     }
-    return "";
 }
 
 void do_file_response(const std::string& file_path, HttpRequest* req,
-                      bufferevent_rate_limit_group* rate_limit_group) {
+                      bufferevent_rate_limit_group* rate_limit_group, bool 
is_acquire_md5) {
     if (file_path.find("..") != std::string::npos) {
         LOG(WARNING) << "Not allowed to read relative path: " << file_path;
         HttpChannel::send_error(req, HttpStatus::FORBIDDEN);
@@ -155,6 +156,17 @@ void do_file_response(const std::string& file_path, 
HttpRequest* req,
 
     req->add_output_header(HttpHeaders::CONTENT_TYPE, 
get_content_type(file_path).c_str());
 
+    if (is_acquire_md5) {
+        Md5Digest md5;
+
+        void* buf = mmap(nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0);
+        md5.update(buf, file_size);
+        md5.digest();
+        munmap(buf, file_size);
+
+        req->add_output_header(HttpHeaders::CONTENT_MD5, md5.hex().c_str());
+    }
+
     if (req->method() == HttpMethod::HEAD) {
         close(fd);
         req->add_output_header(HttpHeaders::CONTENT_LENGTH, 
std::to_string(file_size).c_str());
diff --git a/be/src/http/utils.h b/be/src/http/utils.h
index 2d1e13fbe4e..7369f710537 100644
--- a/be/src/http/utils.h
+++ b/be/src/http/utils.h
@@ -37,7 +37,8 @@ bool parse_basic_auth(const HttpRequest& req, std::string* 
user, std::string* pa
 bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth);
 
 void do_file_response(const std::string& dir_path, HttpRequest* req,
-                      bufferevent_rate_limit_group* rate_limit_group = 
nullptr);
+                      bufferevent_rate_limit_group* rate_limit_group = nullptr,
+                      bool is_acquire_md5 = false);
 
 void do_dir_response(const std::string& dir_path, HttpRequest* req);
 
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index 661ed17fc17..b0f46774d2b 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -392,8 +392,8 @@ Status SnapshotLoader::remote_http_download(
     // Step before, validate all remote
 
     // Step 1: Validate local tablet snapshot paths
-    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
-        auto& path = remote_tablet_snapshot.local_snapshot_path;
+    for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        const auto& path = remote_tablet_snapshot.local_snapshot_path;
         bool res = true;
         RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, 
&res));
         if (!res) {
@@ -408,10 +408,10 @@ Status SnapshotLoader::remote_http_download(
     // Step 2: get all local files
     struct LocalFileStat {
         uint64_t size;
-        // TODO(Drogon): add md5sum
+        std::string md5;
     };
     std::unordered_map<std::string, std::unordered_map<std::string, 
LocalFileStat>> local_files_map;
-    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+    for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
         const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
         std::vector<std::string> local_files;
         RETURN_IF_ERROR(_get_existing_files_from_local(local_path, 
&local_files));
@@ -427,7 +427,14 @@ Status SnapshotLoader::remote_http_download(
                 return Status::IOError("can't retrive file_size of {}, due to 
{}", local_file_path,
                                        ec.message());
             }
-            local_filestat[local_file] = {local_file_size};
+            std::string md5;
+            auto status = 
io::global_local_filesystem()->md5sum(local_file_path, &md5);
+            if (!status.ok()) {
+                LOG(WARNING) << "download file error, local file " << 
local_file_path
+                             << " md5sum: " << status.to_string();
+                return status;
+            }
+            local_filestat[local_file] = {local_file_size, md5};
         }
     }
 
@@ -440,22 +447,22 @@ Status SnapshotLoader::remote_http_download(
     int total_num = remote_tablet_snapshots.size();
     int finished_num = 0;
     struct RemoteFileStat {
-        // TODO(Drogon): Add md5sum
         std::string url;
+        std::string md5;
         uint64_t size;
     };
     std::unordered_map<std::string, std::unordered_map<std::string, 
RemoteFileStat>>
             remote_files_map;
-    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+    for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
         const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
         auto& remote_files = remote_files_map[remote_path];
         const auto& token = remote_tablet_snapshot.remote_token;
         const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
 
         // HEAD 
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
-        std::string remote_url_prefix =
-                
fmt::format("http://{}:{}/api/_tablet/_download?token={}&file={}";,
-                            remote_be_addr.hostname, remote_be_addr.port, 
token, remote_path);
+        std::string base_url = 
fmt::format("http://{}:{}/api/_tablet/_download?token={}";,
+                                           remote_be_addr.hostname, 
remote_be_addr.port, token);
+        std::string remote_url_prefix = fmt::format("{}&file={}", base_url, 
remote_path);
 
         string file_list_str;
         auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* 
client) {
@@ -468,30 +475,31 @@ Status SnapshotLoader::remote_http_download(
                 strings::Split(file_list_str, "\n", strings::SkipWhitespace());
 
         for (const auto& filename : filename_list) {
-            std::string remote_file_url = fmt::format(
-                    
"http://{}:{}/api/_tablet/_download?token={}&file={}/{}&channel=ingest_binlog";,
-                    remote_tablet_snapshot.remote_be_addr.hostname,
-                    remote_tablet_snapshot.remote_be_addr.port, 
remote_tablet_snapshot.remote_token,
-                    remote_tablet_snapshot.remote_snapshot_path, filename);
+            std::string remote_file_url =
+                    fmt::format("{}&file={}/{}&channel=ingest_binlog", 
base_url,
+                                remote_tablet_snapshot.remote_snapshot_path, 
filename);
 
             // get file length
             uint64_t file_size = 0;
-            auto get_file_size_cb = [&remote_file_url, &file_size](HttpClient* 
client) {
-                RETURN_IF_ERROR(client->init(remote_file_url));
+            std::string file_md5;
+            auto get_file_stat_cb = [&remote_file_url, &file_size, 
&file_md5](HttpClient* client) {
+                std::string url = fmt::format("{}&acquire_md5=true", 
remote_file_url);
+                RETURN_IF_ERROR(client->init(url));
                 client->set_timeout_ms(kGetLengthTimeout * 1000);
                 RETURN_IF_ERROR(client->head());
                 RETURN_IF_ERROR(client->get_content_length(&file_size));
+                RETURN_IF_ERROR(client->get_content_md5(&file_md5));
                 return Status::OK();
             };
             RETURN_IF_ERROR(
-                    HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, 
get_file_size_cb));
+                    HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, 
get_file_stat_cb));
 
-            remote_files[filename] = RemoteFileStat {remote_file_url, 
file_size};
+            remote_files[filename] = RemoteFileStat {remote_file_url, 
file_md5, file_size};
         }
     }
 
     // Step 4: Compare local and remote files && get all need download files
-    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+    for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
         RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, 
total_num,
                                       TTaskType::type::DOWNLOAD));
 
@@ -504,8 +512,8 @@ Status SnapshotLoader::remote_http_download(
         // get all need download files
         std::vector<std::string> need_download_files;
         for (const auto& [remote_file, remote_filestat] : remote_files) {
-            LOG(INFO) << fmt::format("remote file: {}, size: {}", remote_file,
-                                     remote_filestat.size);
+            LOG(INFO) << "remote file: " << remote_file << ", size: " << 
remote_filestat.size
+                      << ", md5: " << remote_filestat.md5;
             auto it = local_files.find(remote_file);
             if (it == local_files.end()) {
                 need_download_files.emplace_back(remote_file);
@@ -520,7 +528,11 @@ Status SnapshotLoader::remote_http_download(
                 need_download_files.emplace_back(remote_file);
                 continue;
             }
-            // TODO(Drogon): check by md5sum, if not match then download
+
+            if (auto& local_filestat = it->second; local_filestat.md5 != 
remote_filestat.md5) {
+                need_download_files.emplace_back(remote_file);
+                continue;
+            }
 
             LOG(INFO) << fmt::format("file {} already exists, skip download", 
remote_file);
         }
@@ -544,6 +556,7 @@ Status SnapshotLoader::remote_http_download(
             auto& remote_filestat = remote_files[filename];
             auto file_size = remote_filestat.size;
             auto& remote_file_url = remote_filestat.url;
+            auto& remote_file_md5 = remote_filestat.md5;
 
             // check disk capacity
             if (data_dir->reach_capacity_limit(file_size)) {
@@ -564,8 +577,8 @@ Status SnapshotLoader::remote_http_download(
                       << " to: " << local_file_path << ". size(B): " << 
file_size
                       << ", timeout(s): " << estimate_timeout;
 
-            auto download_cb = [&remote_file_url, estimate_timeout, 
&local_file_path,
-                                file_size](HttpClient* client) {
+            auto download_cb = [&remote_file_url, &remote_file_md5, 
estimate_timeout,
+                                &local_file_path, file_size](HttpClient* 
client) {
                 RETURN_IF_ERROR(client->init(remote_file_url));
                 client->set_timeout_ms(estimate_timeout * 1000);
                 RETURN_IF_ERROR(client->download(local_file_path));
@@ -585,13 +598,35 @@ Status SnapshotLoader::remote_http_download(
                                  << ", local_file_size=" << local_file_size;
                     return Status::InternalError("downloaded file size is not 
equal");
                 }
+
+                if (!remote_file_md5.empty()) { // keep compatibility
+                    std::string local_file_md5;
+                    
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_file_path,
+                                                                          
&local_file_md5));
+                    if (local_file_md5 != remote_file_md5) {
+                        LOG(WARNING) << "download file md5 error"
+                                     << ", remote_file_url=" << remote_file_url
+                                     << ", local_file_path=" << local_file_path
+                                     << ", remote_file_md5=" << remote_file_md5
+                                     << ", local_file_md5=" << local_file_md5;
+                        return Status::RuntimeError(
+                                "download file {} md5 is not equal, local={}, 
remote={}",
+                                remote_file_url, local_file_md5, 
remote_file_md5);
+                    }
+                }
+
                 return io::global_local_filesystem()->permission(
                         local_file_path, io::LocalFileSystem::PERMS_OWNER_RW);
             };
-            
RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, 
download_cb));
+            auto status = 
HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb);
+            if (!status.ok()) {
+                LOG(WARNING) << "failed to download file from " << 
remote_file_url
+                             << ", status: " << status.to_string();
+                return status;
+            }
 
             // local_files always keep the updated local files
-            local_files[filename] = LocalFileStat {file_size};
+            local_files[filename] = LocalFileStat {file_size, remote_file_md5};
         }
 
         uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 2221eea5428..73f9de81cf1 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -262,7 +262,8 @@ void _ingest_binlog(IngestBinlogArg* arg) {
     // Step 5.3: get all segment files
     for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
         auto segment_file_size = segment_file_sizes[segment_index];
-        auto get_segment_file_url = segment_file_urls[segment_index];
+        auto get_segment_file_url =
+                fmt::format("{}&acquire_md5=true", 
segment_file_urls[segment_index]);
 
         uint64_t estimate_timeout =
                 segment_file_size / config::download_low_speed_limit_kbps / 
1024;
@@ -281,6 +282,12 @@ void _ingest_binlog(IngestBinlogArg* arg) {
             RETURN_IF_ERROR(client->download(local_segment_path));
             download_success_files.push_back(local_segment_path);
 
+            std::string remote_file_md5;
+            RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+            LOG(INFO) << "download segment file to " << local_segment_path
+                      << ", remote md5: " << remote_file_md5
+                      << ", remote size: " << segment_file_size;
+
             std::error_code ec;
             // Check file length
             uint64_t local_file_size = 
std::filesystem::file_size(local_segment_path, ec);
@@ -289,13 +296,32 @@ void _ingest_binlog(IngestBinlogArg* arg) {
                 return Status::IOError("can't retrive file_size of {}, due to 
{}",
                                        local_segment_path, ec.message());
             }
+
             if (local_file_size != segment_file_size) {
                 LOG(WARNING) << "download file length error"
                              << ", get_segment_file_url=" << 
get_segment_file_url
                              << ", file_size=" << segment_file_size
                              << ", local_file_size=" << local_file_size;
-                return Status::InternalError("downloaded file size is not 
equal");
+                return Status::RuntimeError(
+                        "downloaded file size is not equal, local={}, 
remote={}", local_file_size,
+                        segment_file_size);
+            }
+
+            if (!remote_file_md5.empty()) { // keep compatibility
+                std::string local_file_md5;
+                RETURN_IF_ERROR(
+                        
io::global_local_filesystem()->md5sum(local_segment_path, &local_file_md5));
+                if (local_file_md5 != remote_file_md5) {
+                    LOG(WARNING) << "download file md5 error"
+                                 << ", get_segment_file_url=" << 
get_segment_file_url
+                                 << ", remote_file_md5=" << remote_file_md5
+                                 << ", local_file_md5=" << local_file_md5;
+                    return Status::RuntimeError(
+                            "download file md5 is not equal, local={}, 
remote={}", local_file_md5,
+                            remote_file_md5);
+                }
             }
+
             return 
io::global_local_filesystem()->permission(local_segment_path,
                                                              
io::LocalFileSystem::PERMS_OWNER_RW);
         };
@@ -369,7 +395,8 @@ void _ingest_binlog(IngestBinlogArg* arg) {
     DCHECK(segment_index_file_names.size() == segment_index_file_urls.size());
     for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) {
         auto segment_index_file_size = segment_index_file_sizes[i];
-        auto get_segment_index_file_url = segment_index_file_urls[i];
+        auto get_segment_index_file_url =
+                fmt::format("{}&acquire_md5=true", segment_index_file_urls[i]);
 
         uint64_t estimate_timeout =
                 segment_index_file_size / 
config::download_low_speed_limit_kbps / 1024;
@@ -388,6 +415,9 @@ void _ingest_binlog(IngestBinlogArg* arg) {
             RETURN_IF_ERROR(client->download(local_segment_index_path));
             download_success_files.push_back(local_segment_index_path);
 
+            std::string remote_file_md5;
+            RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+
             std::error_code ec;
             // Check file length
             uint64_t local_index_file_size =
@@ -402,8 +432,26 @@ void _ingest_binlog(IngestBinlogArg* arg) {
                              << ", get_segment_index_file_url=" << 
get_segment_index_file_url
                              << ", index_file_size=" << segment_index_file_size
                              << ", local_index_file_size=" << 
local_index_file_size;
-                return Status::InternalError("downloaded index file size is 
not equal");
+                return Status::RuntimeError(
+                        "downloaded index file size is not equal, local={}, 
remote={}",
+                        local_index_file_size, segment_index_file_size);
             }
+
+            if (!remote_file_md5.empty()) { // keep compatibility
+                std::string local_file_md5;
+                
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_segment_index_path,
+                                                                      
&local_file_md5));
+                if (local_file_md5 != remote_file_md5) {
+                    LOG(WARNING) << "download file md5 error"
+                                 << ", get_segment_index_file_url=" << 
get_segment_index_file_url
+                                 << ", remote_file_md5=" << remote_file_md5
+                                 << ", local_file_md5=" << local_file_md5;
+                    return Status::RuntimeError(
+                            "download file md5 is not equal, local={}, 
remote={}", local_file_md5,
+                            remote_file_md5);
+                }
+            }
+
             return 
io::global_local_filesystem()->permission(local_segment_index_path,
                                                              
io::LocalFileSystem::PERMS_OWNER_RW);
         };
diff --git a/be/test/http/http_client_test.cpp 
b/be/test/http/http_client_test.cpp
index ae56bd9712f..820fcb53327 100644
--- a/be/test/http/http_client_test.cpp
+++ b/be/test/http/http_client_test.cpp
@@ -17,8 +17,11 @@
 
 #include "http/http_client.h"
 
+#include <fcntl.h>
 #include <gtest/gtest-message.h>
 #include <gtest/gtest-test-part.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
 #include <unistd.h>
 
 #include <boost/algorithm/string/predicate.hpp>
@@ -30,6 +33,7 @@
 #include "http/http_headers.h"
 #include "http/http_request.h"
 #include "http/utils.h"
+#include "util/md5.h"
 
 namespace doris {
 
@@ -43,8 +47,15 @@ public:
             return;
         }
         req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; 
version=0.0.4");
+        bool is_acquire_md5 = !req->param("acquire_md5").empty();
         if (req->method() == HttpMethod::HEAD) {
             req->add_output_header(HttpHeaders::CONTENT_LENGTH, 
std::to_string(5).c_str());
+            if (is_acquire_md5) {
+                Md5Digest md5;
+                md5.update("md5sum", 6);
+                md5.digest();
+                req->add_output_header(HttpHeaders::CONTENT_MD5, 
md5.hex().c_str());
+            }
             HttpChannel::send_reply(req);
         } else {
             std::string response = "test1";
@@ -80,6 +91,13 @@ public:
     }
 };
 
+class HttpDownloadFileHandler : public HttpHandler {
+public:
+    void handle(HttpRequest* req) override {
+        do_file_response("/proc/self/exe", req, nullptr, true);
+    }
+};
+
 static EvHttpServer* s_server = nullptr;
 static int real_port = 0;
 static std::string hostname = "";
@@ -87,6 +105,7 @@ static std::string hostname = "";
 static HttpClientTestSimpleGetHandler s_simple_get_handler;
 static HttpClientTestSimplePostHandler s_simple_post_handler;
 static HttpNotFoundHandler s_not_found_handler;
+static HttpDownloadFileHandler s_download_file_handler;
 
 class HttpClientTest : public testing::Test {
 public:
@@ -99,6 +118,7 @@ public:
         s_server->register_handler(HEAD, "/simple_get", &s_simple_get_handler);
         s_server->register_handler(POST, "/simple_post", 
&s_simple_post_handler);
         s_server->register_handler(GET, "/not_found", &s_not_found_handler);
+        s_server->register_handler(HEAD, "/download_file", 
&s_download_file_handler);
         s_server->start();
         real_port = s_server->get_real_port();
         EXPECT_NE(0, real_port);
@@ -203,4 +223,80 @@ TEST_F(HttpClientTest, not_found) {
     EXPECT_FALSE(status.ok());
 }
 
+TEST_F(HttpClientTest, header_content_md5) {
+    std::string url = hostname + "/simple_get";
+
+    {
+        // without md5
+        HttpClient client;
+        auto st = client.init(url);
+        EXPECT_TRUE(st.ok());
+        client.set_method(HEAD);
+        client.set_basic_auth("test1", "");
+        st = client.execute();
+        EXPECT_TRUE(st.ok());
+        uint64_t len = 0;
+        st = client.get_content_length(&len);
+        EXPECT_TRUE(st.ok());
+        EXPECT_EQ(5, len);
+        std::string md5;
+        st = client.get_content_md5(&md5);
+        EXPECT_TRUE(st.ok());
+        EXPECT_TRUE(md5.empty());
+    }
+
+    {
+        // with md5
+        HttpClient client;
+        auto st = client.init(url + "?acquire_md5=true");
+        EXPECT_TRUE(st.ok());
+        client.set_method(HEAD);
+        client.set_basic_auth("test1", "");
+        st = client.execute();
+        EXPECT_TRUE(st.ok());
+        uint64_t len = 0;
+        st = client.get_content_length(&len);
+        EXPECT_TRUE(st.ok());
+        EXPECT_EQ(5, len);
+        std::string md5_value;
+        st = client.get_content_md5(&md5_value);
+        EXPECT_TRUE(st.ok());
+
+        Md5Digest md5;
+        md5.update("md5sum", 6);
+        md5.digest();
+        EXPECT_EQ(md5_value, md5.hex());
+    }
+}
+
+TEST_F(HttpClientTest, download_file_md5) {
+    std::string url = hostname + "/download_file";
+    HttpClient client;
+    auto st = client.init(url);
+    EXPECT_TRUE(st.ok());
+    client.set_method(HEAD);
+    client.set_basic_auth("test1", "");
+    st = client.execute();
+    EXPECT_TRUE(st.ok());
+
+    std::string md5_value;
+    st = client.get_content_md5(&md5_value);
+    EXPECT_TRUE(st.ok());
+
+    int fd = open("/proc/self/exe", O_RDONLY);
+    ASSERT_TRUE(fd >= 0);
+    struct stat stat;
+    ASSERT_TRUE(fstat(fd, &stat) >= 0);
+
+    int64_t file_size = stat.st_size;
+    Md5Digest md5;
+    void* buf = mmap(nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0);
+    md5.update(buf, file_size);
+    md5.digest();
+    munmap(buf, file_size);
+
+    EXPECT_EQ(md5_value, md5.hex());
+    close(fd);
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to