This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e47b3981235 branch-2.1: [chore](binlog) Add download binlog related configs to BE #47412 (#47585) e47b3981235 is described below commit e47b39812350783816ed2512d7b457a2df83f260 Author: walter <maoch...@selectdb.com> AuthorDate: Sat Feb 8 16:23:23 2025 +0800 branch-2.1: [chore](binlog) Add download binlog related configs to BE #47412 (#47585) cherry pick from #47412 --- be/src/common/config.cpp | 4 ++++ be/src/common/config.h | 4 ++++ be/src/runtime/snapshot_loader.cpp | 21 +++++++++++++------ be/src/service/backend_service.cpp | 42 ++++++++++++++++++++------------------ 4 files changed, 45 insertions(+), 26 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f047071139e..64083aee9c1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -241,6 +241,10 @@ DEFINE_mInt32(max_download_speed_kbps, "50000"); DEFINE_mInt32(download_low_speed_limit_kbps, "50"); // download low speed time(seconds) DEFINE_mInt32(download_low_speed_time, "300"); +// whether to check md5sum when download +DEFINE_mBool(enable_download_md5sum_check, "true"); +// download binlog meta timeout, default 30s +DEFINE_mInt32(download_binlog_meta_timeout_ms, "30000"); DEFINE_String(sys_log_dir, ""); DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 29080a56def..9d08cc78562 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -287,6 +287,10 @@ DECLARE_mInt32(max_download_speed_kbps); DECLARE_mInt32(download_low_speed_limit_kbps); // download low speed time(seconds) DECLARE_mInt32(download_low_speed_time); +// whether to check md5sum when download +DECLARE_mBool(enable_download_md5sum_check); +// download binlog meta timeout +DECLARE_mInt32(download_binlog_meta_timeout_ms); // deprecated, use env var LOG_DIR in be.conf DECLARE_String(sys_log_dir); diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 639701f068c..5d8eae4ca80 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -38,6 +38,7 @@ #include <unordered_map> #include <utility> +#include "common/config.h" #include "common/logging.h" #include "gutil/strings/split.h" #include "http/http_client.h" @@ -419,9 +420,9 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to Status SnapshotLoader::remote_http_download( const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots, std::vector<int64_t>* downloaded_tablet_ids) { - constexpr uint32_t kListRemoteFileTimeout = 15; + LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id, + _task_id); constexpr uint32_t kDownloadFileMaxRetry = 3; - constexpr uint32_t kGetLengthTimeout = 10; // check if job has already been cancelled int tmp_counter = 1; @@ -502,7 +503,7 @@ Status SnapshotLoader::remote_http_download( string file_list_str; auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) { RETURN_IF_ERROR(client->init(remote_url_prefix)); - client->set_timeout_ms(kListRemoteFileTimeout * 1000); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); return client->execute(&file_list_str); }; RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb)); @@ -518,12 +519,20 @@ Status SnapshotLoader::remote_http_download( uint64_t file_size = 0; std::string file_md5; auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) { - std::string url = fmt::format("{}&acquire_md5=true", remote_file_url); + int64_t timeout_ms = config::download_binlog_meta_timeout_ms; + std::string url = remote_file_url; + if (config::enable_download_md5sum_check) { + // compute md5sum is time-consuming, so we set a longer timeout + timeout_ms = config::download_binlog_meta_timeout_ms * 3; + url = fmt::format("{}&acquire_md5=true", remote_file_url); + } RETURN_IF_ERROR(client->init(url)); - client->set_timeout_ms(kGetLengthTimeout * 1000); + client->set_timeout_ms(timeout_ms); RETURN_IF_ERROR(client->head()); RETURN_IF_ERROR(client->get_content_length(&file_size)); - RETURN_IF_ERROR(client->get_content_md5(&file_md5)); + if (config::enable_download_md5sum_check) { + RETURN_IF_ERROR(client->get_content_md5(&file_md5)); + } return Status::OK(); }; RETURN_IF_ERROR( diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index a2a04db4c53..9c8080ddaad 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -84,7 +84,7 @@ class TTransportException; namespace doris { namespace { -constexpr uint64_t kMaxTimeoutMs = 3000; // 3s + struct IngestBinlogArg { int64_t txn_id; int64_t partition_id; @@ -150,6 +150,14 @@ void _ingest_binlog(IngestBinlogArg* arg) { tstatus.error_msgs.push_back(std::move(error_msg)); }; + auto estimate_download_timeout = [](int64_t file_size) { + uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024; + if (estimate_timeout < config::download_low_speed_time) { + estimate_timeout = config::download_low_speed_time; + } + return estimate_timeout; + }; + // Step 3: get binlog info auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download", request.remote_host, request.remote_port); @@ -161,7 +169,7 @@ void _ingest_binlog(IngestBinlogArg* arg) { std::string binlog_info; auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* client) { RETURN_IF_ERROR(client->init(get_binlog_info_url)); - client->set_timeout_ms(kMaxTimeoutMs); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); return client->execute(&binlog_info); }; auto status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb); @@ -200,7 +208,7 @@ void _ingest_binlog(IngestBinlogArg* arg) { std::string rowset_meta_str; auto get_rowset_meta_cb = [&get_rowset_meta_url, &rowset_meta_str](HttpClient* client) { RETURN_IF_ERROR(client->init(get_rowset_meta_url)); - client->set_timeout_ms(kMaxTimeoutMs); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); return client->execute(&rowset_meta_str); }; status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb); @@ -249,7 +257,7 @@ void _ingest_binlog(IngestBinlogArg* arg) { auto get_segment_file_size_cb = [&get_segment_file_size_url, &segment_file_size](HttpClient* client) { RETURN_IF_ERROR(client->init(get_segment_file_size_url)); - client->set_timeout_ms(kMaxTimeoutMs); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); RETURN_IF_ERROR(client->head()); return client->get_content_length(&segment_file_size); }; @@ -285,16 +293,11 @@ void _ingest_binlog(IngestBinlogArg* arg) { auto get_segment_file_url = fmt::format("{}&acquire_md5=true", segment_file_urls[segment_index]); - uint64_t estimate_timeout = - segment_file_size / config::download_low_speed_limit_kbps / 1024; - if (estimate_timeout < config::download_low_speed_time) { - estimate_timeout = config::download_low_speed_time; - } - auto local_segment_path = BetaRowset::segment_file_path( local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index); LOG(INFO) << fmt::format("download segment file from {} to {}", get_segment_file_url, local_segment_path); + uint64_t estimate_timeout = estimate_download_timeout(segment_file_size); auto get_segment_file_cb = [&get_segment_file_url, &local_segment_path, segment_file_size, estimate_timeout, &download_success_files](HttpClient* client) { RETURN_IF_ERROR(client->init(get_segment_file_url)); @@ -303,7 +306,9 @@ void _ingest_binlog(IngestBinlogArg* arg) { download_success_files.push_back(local_segment_path); std::string remote_file_md5; - RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + if (config::enable_download_md5sum_check) { + RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + } LOG(INFO) << "download segment file to " << local_segment_path << ", remote md5: " << remote_file_md5 << ", remote size: " << segment_file_size; @@ -378,7 +383,7 @@ void _ingest_binlog(IngestBinlogArg* arg) { [&get_segment_index_file_size_url, &segment_index_file_size](HttpClient* client) { RETURN_IF_ERROR(client->init(get_segment_index_file_size_url)); - client->set_timeout_ms(kMaxTimeoutMs); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); RETURN_IF_ERROR(client->head()); return client->get_content_length(&segment_index_file_size); }; @@ -414,7 +419,7 @@ void _ingest_binlog(IngestBinlogArg* arg) { [&get_segment_index_file_size_url, &segment_index_file_size](HttpClient* client) { RETURN_IF_ERROR(client->init(get_segment_index_file_size_url)); - client->set_timeout_ms(kMaxTimeoutMs); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); RETURN_IF_ERROR(client->head()); return client->get_content_length(&segment_index_file_size); }; @@ -461,12 +466,7 @@ void _ingest_binlog(IngestBinlogArg* arg) { auto get_segment_index_file_url = fmt::format("{}&acquire_md5=true", segment_index_file_urls[i]); - uint64_t estimate_timeout = - segment_index_file_size / config::download_low_speed_limit_kbps / 1024; - if (estimate_timeout < config::download_low_speed_time) { - estimate_timeout = config::download_low_speed_time; - } - + uint64_t estimate_timeout = estimate_download_timeout(segment_index_file_size); auto local_segment_index_path = segment_index_file_names[i]; LOG(INFO) << fmt::format("download segment index file from {} to {}", get_segment_index_file_url, local_segment_index_path); @@ -479,7 +479,9 @@ void _ingest_binlog(IngestBinlogArg* arg) { download_success_files.push_back(local_segment_index_path); std::string remote_file_md5; - RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + if (config::enable_download_md5sum_check) { + RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + } std::error_code ec; // Check file length --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org