This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 7fc4ddcb3a8 [chore](be) Acquire and check MD5 digest of the file to download (#37420) 7fc4ddcb3a8 is described below commit 7fc4ddcb3a8bd39007856b62a93e7fece9d88388 Author: walter <w41te...@gmail.com> AuthorDate: Fri Jul 26 14:03:02 2024 +0800 [chore](be) Acquire and check MD5 digest of the file to download (#37420) Cherry-pick #35807, #36621, #36726 --- be/src/http/action/download_action.cpp | 21 +++--- be/src/http/action/download_binlog_action.cpp | 9 ++- be/src/http/http_client.cpp | 46 ++++++++++++- be/src/http/http_client.h | 5 +- be/src/http/utils.cpp | 30 ++++++--- be/src/http/utils.h | 3 +- be/src/runtime/snapshot_loader.cpp | 89 +++++++++++++++++-------- be/src/service/backend_service.cpp | 56 ++++++++++++++-- be/test/http/http_client_test.cpp | 96 +++++++++++++++++++++++++++ 9 files changed, 297 insertions(+), 58 deletions(-) diff --git a/be/src/http/action/download_action.cpp b/be/src/http/action/download_action.cpp index 720b9d65fa3..680bcc19e70 100644 --- a/be/src/http/action/download_action.cpp +++ b/be/src/http/action/download_action.cpp @@ -17,9 +17,7 @@ #include "http/action/download_action.h" -#include <algorithm> #include <memory> -#include <sstream> #include <string> #include <utility> @@ -33,10 +31,11 @@ namespace doris { namespace { -static const std::string FILE_PARAMETER = "file"; -static const std::string TOKEN_PARAMETER = "token"; -static const std::string CHANNEL_PARAMETER = "channel"; -static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog"; +const std::string FILE_PARAMETER = "file"; +const std::string TOKEN_PARAMETER = "token"; +const std::string CHANNEL_PARAMETER = "channel"; +const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog"; +const std::string ACQUIRE_MD5_PARAMETER = "acquire_md5"; } // namespace DownloadAction::DownloadAction(ExecEnv* exec_env, @@ -46,7 +45,7 @@ DownloadAction::DownloadAction(ExecEnv* exec_env, _download_type(NORMAL), _num_workers(num_workers), _rate_limit_group(std::move(rate_limit_group)) { - for (auto& dir : allow_dirs) { + for (const auto& dir : allow_dirs) { std::string p; Status st = io::global_local_filesystem()->canonicalize(dir, &p); if (!st.ok()) { @@ -114,11 +113,9 @@ void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_par } else { const auto& channel = req->param(CHANNEL_PARAMETER); bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE); - if (ingest_binlog) { - do_file_response(file_param, req, _rate_limit_group.get()); - } else { - do_file_response(file_param, req); - } + bool is_acquire_md5 = !req->param(ACQUIRE_MD5_PARAMETER).empty(); + auto* rate_limit_group = ingest_binlog ? _rate_limit_group.get() : nullptr; + do_file_response(file_param, req, rate_limit_group, is_acquire_md5); } } diff --git a/be/src/http/action/download_binlog_action.cpp b/be/src/http/action/download_binlog_action.cpp index dbe2880d3b4..61d65ca9756 100644 --- a/be/src/http/action/download_binlog_action.cpp +++ b/be/src/http/action/download_binlog_action.cpp @@ -48,6 +48,7 @@ const std::string kBinlogVersionParameter = "binlog_version"; const std::string kRowsetIdParameter = "rowset_id"; const std::string kSegmentIndexParameter = "segment_index"; const std::string kSegmentIndexIdParameter = "segment_index_id"; +const std::string kAcquireMD5Parameter = "acquire_md5"; // get http param, if no value throw exception const auto& get_http_param(HttpRequest* req, const std::string& param_name) { @@ -102,12 +103,14 @@ void handle_get_binlog_info(HttpRequest* req) { void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group) { // Step 1: get download file path std::string segment_file_path; + bool is_acquire_md5 = false; try { const auto& tablet_id = get_http_param(req, kTabletIdParameter); auto tablet = get_tablet(tablet_id); const auto& rowset_id = get_http_param(req, kRowsetIdParameter); const auto& segment_index = get_http_param(req, kSegmentIndexParameter); segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index); + is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty(); } catch (const std::exception& e) { HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); LOG(WARNING) << "get download file path failed, error: " << e.what(); @@ -128,7 +131,7 @@ void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rat LOG(WARNING) << "file not exist, file path: " << segment_file_path; return; } - do_file_response(segment_file_path, req, rate_limit_group); + do_file_response(segment_file_path, req, rate_limit_group, is_acquire_md5); } /// handle get segment index file, need tablet_id, rowset_id, segment_index && segment_index_id @@ -136,6 +139,7 @@ void handle_get_segment_index_file(HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group) { // Step 1: get download file path std::string segment_index_file_path; + bool is_acquire_md5 = false; try { const auto& tablet_id = get_http_param(req, kTabletIdParameter); auto tablet = get_tablet(tablet_id); @@ -144,6 +148,7 @@ void handle_get_segment_index_file(HttpRequest* req, const auto& segment_index_id = req->param(kSegmentIndexIdParameter); segment_index_file_path = tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id); + is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty(); } catch (const std::exception& e) { HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); LOG(WARNING) << "get download file path failed, error: " << e.what(); @@ -164,7 +169,7 @@ void handle_get_segment_index_file(HttpRequest* req, LOG(WARNING) << "file not exist, file path: " << segment_index_file_path; return; } - do_file_response(segment_index_file_path, req, rate_limit_group); + do_file_response(segment_index_file_path, req, rate_limit_group, is_acquire_md5); } void handle_get_rowset_meta(HttpRequest* req) { diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index 3fe52bf4912..218802878bd 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -24,12 +24,36 @@ #include <ostream> #include "common/config.h" +#include "http/http_headers.h" #include "http/http_status.h" #include "util/stack_util.h" namespace doris { -HttpClient::HttpClient() {} +static const char* header_error_msg(CURLHcode code) { + switch (code) { + case CURLHE_OK: + return "OK"; + case CURLHE_BADINDEX: + return "header exists but not with this index "; + case CURLHE_MISSING: + return "no such header exists"; + case CURLHE_NOHEADERS: + return "no headers at all exist (yet)"; + case CURLHE_NOREQUEST: + return "no request with this number was used"; + case CURLHE_OUT_OF_MEMORY: + return "out of memory while processing"; + case CURLHE_BAD_ARGUMENT: + return "a function argument was not okay"; + case CURLHE_NOT_BUILT_IN: + return "curl_easy_header() was disabled in the build"; + default: + return "unknown"; + } +} + +HttpClient::HttpClient() = default; HttpClient::~HttpClient() { if (_curl != nullptr) { @@ -92,7 +116,7 @@ Status HttpClient::init(const std::string& url, bool set_fail_on_error) { } curl_write_callback callback = [](char* buffer, size_t size, size_t nmemb, void* param) { - HttpClient* client = (HttpClient*)param; + auto* client = (HttpClient*)param; return client->on_response_data(buffer, size * nmemb); }; @@ -181,6 +205,24 @@ Status HttpClient::execute(const std::function<bool(const void* data, size_t len return Status::OK(); } +Status HttpClient::get_content_md5(std::string* md5) const { + struct curl_header* header_ptr; + auto code = curl_easy_header(_curl, HttpHeaders::CONTENT_MD5, 0, CURLH_HEADER, 0, &header_ptr); + if (code == CURLHE_MISSING || code == CURLHE_NOHEADERS) { + // no such headers exists + md5->clear(); + return Status::OK(); + } else if (code != CURLHE_OK) { + auto msg = fmt::format("failed to get http header {}: {} ({})", HttpHeaders::CONTENT_MD5, + header_error_msg(code), code); + LOG(WARNING) << msg << ", trace=" << get_stack_trace(); + return Status::HttpError(std::move(msg)); + } + + *md5 = header_ptr->value; + return Status::OK(); +} + Status HttpClient::download(const std::string& local_path) { // set method to GET set_method(GET); diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index d1b2e1b47af..a6ab49c1e8a 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -106,7 +106,7 @@ public: if (cl < 0) { return Status::InternalError( fmt::format("failed to get content length, it should be a positive value, " - "actrual is : {}", + "actual is : {}", cl)); } *length = cl; @@ -115,6 +115,9 @@ public: return Status::InternalError("failed to get content length. err code: {}", code); } + // Get the value of the header CONTENT-MD5. The output is empty if no such header exists. + Status get_content_md5(std::string* md5) const; + long get_http_status() const { long code; curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &code); diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index 3dd0b839022..93a3d92b32e 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -19,6 +19,7 @@ #include <fcntl.h> #include <stdint.h> +#include <sys/mman.h> #include <sys/stat.h> #include <unistd.h> @@ -36,6 +37,7 @@ #include "http/http_status.h" #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" +#include "util/md5.h" #include "util/path_util.h" #include "util/url_coding.h" @@ -51,7 +53,7 @@ std::string encode_basic_auth(const std::string& user, const std::string& passwd bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* passwd) { const char k_basic[] = "Basic "; - auto& auth = req.header(HttpHeaders::AUTHORIZATION); + const auto& auth = req.header(HttpHeaders::AUTHORIZATION); if (auth.compare(0, sizeof(k_basic) - 1, k_basic, sizeof(k_basic) - 1) != 0) { return false; } @@ -103,25 +105,24 @@ std::string get_content_type(const std::string& file_name) { std::string file_ext = path_util::file_extension(file_name); VLOG_TRACE << "file_name: " << file_name << "; file extension: [" << file_ext << "]"; if (file_ext == std::string(".html") || file_ext == std::string(".htm")) { - return std::string("text/html; charset=utf-8"); + return "text/html; charset=utf-8"; } else if (file_ext == std::string(".js")) { - return std::string("application/javascript; charset=utf-8"); + return "application/javascript; charset=utf-8"; } else if (file_ext == std::string(".css")) { - return std::string("text/css; charset=utf-8"); + return "text/css; charset=utf-8"; } else if (file_ext == std::string(".txt")) { - return std::string("text/plain; charset=utf-8"); + return "text/plain; charset=utf-8"; } else if (file_ext == std::string(".png")) { - return std::string("image/png"); + return "image/png"; } else if (file_ext == std::string(".ico")) { - return std::string("image/x-icon"); + return "image/x-icon"; } else { return "text/plain; charset=utf-8"; } - return ""; } void do_file_response(const std::string& file_path, HttpRequest* req, - bufferevent_rate_limit_group* rate_limit_group) { + bufferevent_rate_limit_group* rate_limit_group, bool is_acquire_md5) { if (file_path.find("..") != std::string::npos) { LOG(WARNING) << "Not allowed to read relative path: " << file_path; HttpChannel::send_error(req, HttpStatus::FORBIDDEN); @@ -155,6 +156,17 @@ void do_file_response(const std::string& file_path, HttpRequest* req, req->add_output_header(HttpHeaders::CONTENT_TYPE, get_content_type(file_path).c_str()); + if (is_acquire_md5) { + Md5Digest md5; + + void* buf = mmap(nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0); + md5.update(buf, file_size); + md5.digest(); + munmap(buf, file_size); + + req->add_output_header(HttpHeaders::CONTENT_MD5, md5.hex().c_str()); + } + if (req->method() == HttpMethod::HEAD) { close(fd); req->add_output_header(HttpHeaders::CONTENT_LENGTH, std::to_string(file_size).c_str()); diff --git a/be/src/http/utils.h b/be/src/http/utils.h index 2d1e13fbe4e..7369f710537 100644 --- a/be/src/http/utils.h +++ b/be/src/http/utils.h @@ -37,7 +37,8 @@ bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* pa bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth); void do_file_response(const std::string& dir_path, HttpRequest* req, - bufferevent_rate_limit_group* rate_limit_group = nullptr); + bufferevent_rate_limit_group* rate_limit_group = nullptr, + bool is_acquire_md5 = false); void do_dir_response(const std::string& dir_path, HttpRequest* req); diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 661ed17fc17..b0f46774d2b 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -392,8 +392,8 @@ Status SnapshotLoader::remote_http_download( // Step before, validate all remote // Step 1: Validate local tablet snapshot paths - for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { - auto& path = remote_tablet_snapshot.local_snapshot_path; + for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { + const auto& path = remote_tablet_snapshot.local_snapshot_path; bool res = true; RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res)); if (!res) { @@ -408,10 +408,10 @@ Status SnapshotLoader::remote_http_download( // Step 2: get all local files struct LocalFileStat { uint64_t size; - // TODO(Drogon): add md5sum + std::string md5; }; std::unordered_map<std::string, std::unordered_map<std::string, LocalFileStat>> local_files_map; - for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { + for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { const auto& local_path = remote_tablet_snapshot.local_snapshot_path; std::vector<std::string> local_files; RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files)); @@ -427,7 +427,14 @@ Status SnapshotLoader::remote_http_download( return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path, ec.message()); } - local_filestat[local_file] = {local_file_size}; + std::string md5; + auto status = io::global_local_filesystem()->md5sum(local_file_path, &md5); + if (!status.ok()) { + LOG(WARNING) << "download file error, local file " << local_file_path + << " md5sum: " << status.to_string(); + return status; + } + local_filestat[local_file] = {local_file_size, md5}; } } @@ -440,22 +447,22 @@ Status SnapshotLoader::remote_http_download( int total_num = remote_tablet_snapshots.size(); int finished_num = 0; struct RemoteFileStat { - // TODO(Drogon): Add md5sum std::string url; + std::string md5; uint64_t size; }; std::unordered_map<std::string, std::unordered_map<std::string, RemoteFileStat>> remote_files_map; - for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { + for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; auto& remote_files = remote_files_map[remote_path]; const auto& token = remote_tablet_snapshot.remote_token; const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr; // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/ - std::string remote_url_prefix = - fmt::format("http://{}:{}/api/_tablet/_download?token={}&file={}", - remote_be_addr.hostname, remote_be_addr.port, token, remote_path); + std::string base_url = fmt::format("http://{}:{}/api/_tablet/_download?token={}", + remote_be_addr.hostname, remote_be_addr.port, token); + std::string remote_url_prefix = fmt::format("{}&file={}", base_url, remote_path); string file_list_str; auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) { @@ -468,30 +475,31 @@ Status SnapshotLoader::remote_http_download( strings::Split(file_list_str, "\n", strings::SkipWhitespace()); for (const auto& filename : filename_list) { - std::string remote_file_url = fmt::format( - "http://{}:{}/api/_tablet/_download?token={}&file={}/{}&channel=ingest_binlog", - remote_tablet_snapshot.remote_be_addr.hostname, - remote_tablet_snapshot.remote_be_addr.port, remote_tablet_snapshot.remote_token, - remote_tablet_snapshot.remote_snapshot_path, filename); + std::string remote_file_url = + fmt::format("{}&file={}/{}&channel=ingest_binlog", base_url, + remote_tablet_snapshot.remote_snapshot_path, filename); // get file length uint64_t file_size = 0; - auto get_file_size_cb = [&remote_file_url, &file_size](HttpClient* client) { - RETURN_IF_ERROR(client->init(remote_file_url)); + std::string file_md5; + auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) { + std::string url = fmt::format("{}&acquire_md5=true", remote_file_url); + RETURN_IF_ERROR(client->init(url)); client->set_timeout_ms(kGetLengthTimeout * 1000); RETURN_IF_ERROR(client->head()); RETURN_IF_ERROR(client->get_content_length(&file_size)); + RETURN_IF_ERROR(client->get_content_md5(&file_md5)); return Status::OK(); }; RETURN_IF_ERROR( - HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_size_cb)); + HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_stat_cb)); - remote_files[filename] = RemoteFileStat {remote_file_url, file_size}; + remote_files[filename] = RemoteFileStat {remote_file_url, file_md5, file_size}; } } // Step 4: Compare local and remote files && get all need download files - for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { + for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, TTaskType::type::DOWNLOAD)); @@ -504,8 +512,8 @@ Status SnapshotLoader::remote_http_download( // get all need download files std::vector<std::string> need_download_files; for (const auto& [remote_file, remote_filestat] : remote_files) { - LOG(INFO) << fmt::format("remote file: {}, size: {}", remote_file, - remote_filestat.size); + LOG(INFO) << "remote file: " << remote_file << ", size: " << remote_filestat.size + << ", md5: " << remote_filestat.md5; auto it = local_files.find(remote_file); if (it == local_files.end()) { need_download_files.emplace_back(remote_file); @@ -520,7 +528,11 @@ Status SnapshotLoader::remote_http_download( need_download_files.emplace_back(remote_file); continue; } - // TODO(Drogon): check by md5sum, if not match then download + + if (auto& local_filestat = it->second; local_filestat.md5 != remote_filestat.md5) { + need_download_files.emplace_back(remote_file); + continue; + } LOG(INFO) << fmt::format("file {} already exists, skip download", remote_file); } @@ -544,6 +556,7 @@ Status SnapshotLoader::remote_http_download( auto& remote_filestat = remote_files[filename]; auto file_size = remote_filestat.size; auto& remote_file_url = remote_filestat.url; + auto& remote_file_md5 = remote_filestat.md5; // check disk capacity if (data_dir->reach_capacity_limit(file_size)) { @@ -564,8 +577,8 @@ Status SnapshotLoader::remote_http_download( << " to: " << local_file_path << ". size(B): " << file_size << ", timeout(s): " << estimate_timeout; - auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path, - file_size](HttpClient* client) { + auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout, + &local_file_path, file_size](HttpClient* client) { RETURN_IF_ERROR(client->init(remote_file_url)); client->set_timeout_ms(estimate_timeout * 1000); RETURN_IF_ERROR(client->download(local_file_path)); @@ -585,13 +598,35 @@ Status SnapshotLoader::remote_http_download( << ", local_file_size=" << local_file_size; return Status::InternalError("downloaded file size is not equal"); } + + if (!remote_file_md5.empty()) { // keep compatibility + std::string local_file_md5; + RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_file_path, + &local_file_md5)); + if (local_file_md5 != remote_file_md5) { + LOG(WARNING) << "download file md5 error" + << ", remote_file_url=" << remote_file_url + << ", local_file_path=" << local_file_path + << ", remote_file_md5=" << remote_file_md5 + << ", local_file_md5=" << local_file_md5; + return Status::RuntimeError( + "download file {} md5 is not equal, local={}, remote={}", + remote_file_url, local_file_md5, remote_file_md5); + } + } + return io::global_local_filesystem()->permission( local_file_path, io::LocalFileSystem::PERMS_OWNER_RW); }; - RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb)); + auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to download file from " << remote_file_url + << ", status: " << status.to_string(); + return status; + } // local_files always keep the updated local files - local_files[filename] = LocalFileStat {file_size}; + local_files[filename] = LocalFileStat {file_size, remote_file_md5}; } uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000; diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 2221eea5428..73f9de81cf1 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -262,7 +262,8 @@ void _ingest_binlog(IngestBinlogArg* arg) { // Step 5.3: get all segment files for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { auto segment_file_size = segment_file_sizes[segment_index]; - auto get_segment_file_url = segment_file_urls[segment_index]; + auto get_segment_file_url = + fmt::format("{}&acquire_md5=true", segment_file_urls[segment_index]); uint64_t estimate_timeout = segment_file_size / config::download_low_speed_limit_kbps / 1024; @@ -281,6 +282,12 @@ void _ingest_binlog(IngestBinlogArg* arg) { RETURN_IF_ERROR(client->download(local_segment_path)); download_success_files.push_back(local_segment_path); + std::string remote_file_md5; + RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + LOG(INFO) << "download segment file to " << local_segment_path + << ", remote md5: " << remote_file_md5 + << ", remote size: " << segment_file_size; + std::error_code ec; // Check file length uint64_t local_file_size = std::filesystem::file_size(local_segment_path, ec); @@ -289,13 +296,32 @@ void _ingest_binlog(IngestBinlogArg* arg) { return Status::IOError("can't retrive file_size of {}, due to {}", local_segment_path, ec.message()); } + if (local_file_size != segment_file_size) { LOG(WARNING) << "download file length error" << ", get_segment_file_url=" << get_segment_file_url << ", file_size=" << segment_file_size << ", local_file_size=" << local_file_size; - return Status::InternalError("downloaded file size is not equal"); + return Status::RuntimeError( + "downloaded file size is not equal, local={}, remote={}", local_file_size, + segment_file_size); + } + + if (!remote_file_md5.empty()) { // keep compatibility + std::string local_file_md5; + RETURN_IF_ERROR( + io::global_local_filesystem()->md5sum(local_segment_path, &local_file_md5)); + if (local_file_md5 != remote_file_md5) { + LOG(WARNING) << "download file md5 error" + << ", get_segment_file_url=" << get_segment_file_url + << ", remote_file_md5=" << remote_file_md5 + << ", local_file_md5=" << local_file_md5; + return Status::RuntimeError( + "download file md5 is not equal, local={}, remote={}", local_file_md5, + remote_file_md5); + } } + return io::global_local_filesystem()->permission(local_segment_path, io::LocalFileSystem::PERMS_OWNER_RW); }; @@ -369,7 +395,8 @@ void _ingest_binlog(IngestBinlogArg* arg) { DCHECK(segment_index_file_names.size() == segment_index_file_urls.size()); for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) { auto segment_index_file_size = segment_index_file_sizes[i]; - auto get_segment_index_file_url = segment_index_file_urls[i]; + auto get_segment_index_file_url = + fmt::format("{}&acquire_md5=true", segment_index_file_urls[i]); uint64_t estimate_timeout = segment_index_file_size / config::download_low_speed_limit_kbps / 1024; @@ -388,6 +415,9 @@ void _ingest_binlog(IngestBinlogArg* arg) { RETURN_IF_ERROR(client->download(local_segment_index_path)); download_success_files.push_back(local_segment_index_path); + std::string remote_file_md5; + RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + std::error_code ec; // Check file length uint64_t local_index_file_size = @@ -402,8 +432,26 @@ void _ingest_binlog(IngestBinlogArg* arg) { << ", get_segment_index_file_url=" << get_segment_index_file_url << ", index_file_size=" << segment_index_file_size << ", local_index_file_size=" << local_index_file_size; - return Status::InternalError("downloaded index file size is not equal"); + return Status::RuntimeError( + "downloaded index file size is not equal, local={}, remote={}", + local_index_file_size, segment_index_file_size); } + + if (!remote_file_md5.empty()) { // keep compatibility + std::string local_file_md5; + RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_segment_index_path, + &local_file_md5)); + if (local_file_md5 != remote_file_md5) { + LOG(WARNING) << "download file md5 error" + << ", get_segment_index_file_url=" << get_segment_index_file_url + << ", remote_file_md5=" << remote_file_md5 + << ", local_file_md5=" << local_file_md5; + return Status::RuntimeError( + "download file md5 is not equal, local={}, remote={}", local_file_md5, + remote_file_md5); + } + } + return io::global_local_filesystem()->permission(local_segment_index_path, io::LocalFileSystem::PERMS_OWNER_RW); }; diff --git a/be/test/http/http_client_test.cpp b/be/test/http/http_client_test.cpp index ae56bd9712f..820fcb53327 100644 --- a/be/test/http/http_client_test.cpp +++ b/be/test/http/http_client_test.cpp @@ -17,8 +17,11 @@ #include "http/http_client.h" +#include <fcntl.h> #include <gtest/gtest-message.h> #include <gtest/gtest-test-part.h> +#include <sys/mman.h> +#include <sys/stat.h> #include <unistd.h> #include <boost/algorithm/string/predicate.hpp> @@ -30,6 +33,7 @@ #include "http/http_headers.h" #include "http/http_request.h" #include "http/utils.h" +#include "util/md5.h" namespace doris { @@ -43,8 +47,15 @@ public: return; } req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4"); + bool is_acquire_md5 = !req->param("acquire_md5").empty(); if (req->method() == HttpMethod::HEAD) { req->add_output_header(HttpHeaders::CONTENT_LENGTH, std::to_string(5).c_str()); + if (is_acquire_md5) { + Md5Digest md5; + md5.update("md5sum", 6); + md5.digest(); + req->add_output_header(HttpHeaders::CONTENT_MD5, md5.hex().c_str()); + } HttpChannel::send_reply(req); } else { std::string response = "test1"; @@ -80,6 +91,13 @@ public: } }; +class HttpDownloadFileHandler : public HttpHandler { +public: + void handle(HttpRequest* req) override { + do_file_response("/proc/self/exe", req, nullptr, true); + } +}; + static EvHttpServer* s_server = nullptr; static int real_port = 0; static std::string hostname = ""; @@ -87,6 +105,7 @@ static std::string hostname = ""; static HttpClientTestSimpleGetHandler s_simple_get_handler; static HttpClientTestSimplePostHandler s_simple_post_handler; static HttpNotFoundHandler s_not_found_handler; +static HttpDownloadFileHandler s_download_file_handler; class HttpClientTest : public testing::Test { public: @@ -99,6 +118,7 @@ public: s_server->register_handler(HEAD, "/simple_get", &s_simple_get_handler); s_server->register_handler(POST, "/simple_post", &s_simple_post_handler); s_server->register_handler(GET, "/not_found", &s_not_found_handler); + s_server->register_handler(HEAD, "/download_file", &s_download_file_handler); s_server->start(); real_port = s_server->get_real_port(); EXPECT_NE(0, real_port); @@ -203,4 +223,80 @@ TEST_F(HttpClientTest, not_found) { EXPECT_FALSE(status.ok()); } +TEST_F(HttpClientTest, header_content_md5) { + std::string url = hostname + "/simple_get"; + + { + // without md5 + HttpClient client; + auto st = client.init(url); + EXPECT_TRUE(st.ok()); + client.set_method(HEAD); + client.set_basic_auth("test1", ""); + st = client.execute(); + EXPECT_TRUE(st.ok()); + uint64_t len = 0; + st = client.get_content_length(&len); + EXPECT_TRUE(st.ok()); + EXPECT_EQ(5, len); + std::string md5; + st = client.get_content_md5(&md5); + EXPECT_TRUE(st.ok()); + EXPECT_TRUE(md5.empty()); + } + + { + // with md5 + HttpClient client; + auto st = client.init(url + "?acquire_md5=true"); + EXPECT_TRUE(st.ok()); + client.set_method(HEAD); + client.set_basic_auth("test1", ""); + st = client.execute(); + EXPECT_TRUE(st.ok()); + uint64_t len = 0; + st = client.get_content_length(&len); + EXPECT_TRUE(st.ok()); + EXPECT_EQ(5, len); + std::string md5_value; + st = client.get_content_md5(&md5_value); + EXPECT_TRUE(st.ok()); + + Md5Digest md5; + md5.update("md5sum", 6); + md5.digest(); + EXPECT_EQ(md5_value, md5.hex()); + } +} + +TEST_F(HttpClientTest, download_file_md5) { + std::string url = hostname + "/download_file"; + HttpClient client; + auto st = client.init(url); + EXPECT_TRUE(st.ok()); + client.set_method(HEAD); + client.set_basic_auth("test1", ""); + st = client.execute(); + EXPECT_TRUE(st.ok()); + + std::string md5_value; + st = client.get_content_md5(&md5_value); + EXPECT_TRUE(st.ok()); + + int fd = open("/proc/self/exe", O_RDONLY); + ASSERT_TRUE(fd >= 0); + struct stat stat; + ASSERT_TRUE(fstat(fd, &stat) >= 0); + + int64_t file_size = stat.st_size; + Md5Digest md5; + void* buf = mmap(nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0); + md5.update(buf, file_size); + md5.digest(); + munmap(buf, file_size); + + EXPECT_EQ(md5_value, md5.hex()); + close(fd); +} + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org