This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new a7935152e7b branch-3.0: [chore](binlog) Add download binlog related configs to BE #47412 (#47587) a7935152e7b is described below commit a7935152e7b2ff8ccc2693f149da85926009b39a Author: walter <maoch...@selectdb.com> AuthorDate: Wed Feb 19 19:21:38 2025 +0800 branch-3.0: [chore](binlog) Add download binlog related configs to BE #47412 (#47587) cherry pick from #47412 --- be/src/common/config.cpp | 4 ++++ be/src/common/config.h | 4 ++++ be/src/runtime/snapshot_loader.cpp | 22 ++++++++++++++------ be/src/service/backend_service.cpp | 42 ++++++++++++++++++++------------------ 4 files changed, 46 insertions(+), 26 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f2e4ed95640..7b0079f0ce6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -256,6 +256,10 @@ DEFINE_mInt32(download_low_speed_limit_kbps, "50"); DEFINE_mInt32(download_low_speed_time, "300"); // whether to download small files in batch DEFINE_mBool(enable_batch_download, "false"); +// whether to check md5sum when download +DEFINE_mBool(enable_download_md5sum_check, "true"); +// download binlog meta timeout, default 30s +DEFINE_mInt32(download_binlog_meta_timeout_ms, "30000"); DEFINE_String(sys_log_dir, ""); DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 64777a532a4..7558b5286d9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -302,6 +302,10 @@ DECLARE_mInt32(download_low_speed_limit_kbps); DECLARE_mInt32(download_low_speed_time); // whether to download small files in batch. DECLARE_mBool(enable_batch_download); +// whether to check md5sum when download +DECLARE_mBool(enable_download_md5sum_check); +// download binlog meta timeout +DECLARE_mInt32(download_binlog_meta_timeout_ms); // deprecated, use env var LOG_DIR in be.conf DECLARE_String(sys_log_dir); diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 97955d94051..d95bbb7f792 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -35,6 +35,7 @@ #include <unordered_map> #include <utility> +#include "common/config.h" #include "common/logging.h" #include "gutil/strings/split.h" #include "http/http_client.h" @@ -412,9 +413,10 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to Status SnapshotLoader::remote_http_download( const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots, std::vector<int64_t>* downloaded_tablet_ids) { - constexpr uint32_t kListRemoteFileTimeout = 15; + LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id, + _task_id); + constexpr uint32_t kDownloadFileMaxRetry = 3; - constexpr uint32_t kGetLengthTimeout = 10; // check if job has already been cancelled int tmp_counter = 1; @@ -497,7 +499,7 @@ Status SnapshotLoader::remote_http_download( string file_list_str; auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) { RETURN_IF_ERROR(client->init(remote_url_prefix)); - client->set_timeout_ms(kListRemoteFileTimeout * 1000); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); return client->execute(&file_list_str); }; RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, list_files_cb)); @@ -513,12 +515,20 @@ Status SnapshotLoader::remote_http_download( uint64_t file_size = 0; std::string file_md5; auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) { - std::string url = fmt::format("{}&acquire_md5=true", remote_file_url); + int64_t timeout_ms = config::download_binlog_meta_timeout_ms; + std::string url = remote_file_url; + if (config::enable_download_md5sum_check) { + // compute md5sum is time-consuming, so we set a longer timeout + timeout_ms = config::download_binlog_meta_timeout_ms * 3; + url = fmt::format("{}&acquire_md5=true", remote_file_url); + } RETURN_IF_ERROR(client->init(url)); - client->set_timeout_ms(kGetLengthTimeout * 1000); + client->set_timeout_ms(timeout_ms); RETURN_IF_ERROR(client->head()); RETURN_IF_ERROR(client->get_content_length(&file_size)); - RETURN_IF_ERROR(client->get_content_md5(&file_md5)); + if (config::enable_download_md5sum_check) { + RETURN_IF_ERROR(client->get_content_md5(&file_md5)); + } return Status::OK(); }; RETURN_IF_ERROR( diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 6bb73f37b8c..817f7ffb914 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -90,7 +90,7 @@ class TTransportException; namespace doris { namespace { -constexpr uint64_t kMaxTimeoutMs = 3000; // 3s + struct IngestBinlogArg { int64_t txn_id; int64_t partition_id; @@ -156,6 +156,14 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { tstatus.error_msgs.push_back(std::move(error_msg)); }; + auto estimate_download_timeout = [](int64_t file_size) { + uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024; + if (estimate_timeout < config::download_low_speed_time) { + estimate_timeout = config::download_low_speed_time; + } + return estimate_timeout; + }; + // Step 3: get binlog info auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download", request.remote_host, request.remote_port); @@ -167,7 +175,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { std::string binlog_info; auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* client) { RETURN_IF_ERROR(client->init(get_binlog_info_url)); - client->set_timeout_ms(kMaxTimeoutMs); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); return client->execute(&binlog_info); }; auto status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb); @@ -206,7 +214,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { std::string rowset_meta_str; auto get_rowset_meta_cb = [&get_rowset_meta_url, &rowset_meta_str](HttpClient* client) { RETURN_IF_ERROR(client->init(get_rowset_meta_url)); - client->set_timeout_ms(kMaxTimeoutMs); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); return client->execute(&rowset_meta_str); }; status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb); @@ -255,7 +263,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { auto get_segment_file_size_cb = [&get_segment_file_size_url, &segment_file_size](HttpClient* client) { RETURN_IF_ERROR(client->init(get_segment_file_size_url)); - client->set_timeout_ms(kMaxTimeoutMs); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); RETURN_IF_ERROR(client->head()); return client->get_content_length(&segment_file_size); }; @@ -291,16 +299,11 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { auto get_segment_file_url = fmt::format("{}&acquire_md5=true", segment_file_urls[segment_index]); - uint64_t estimate_timeout = - segment_file_size / config::download_low_speed_limit_kbps / 1024; - if (estimate_timeout < config::download_low_speed_time) { - estimate_timeout = config::download_low_speed_time; - } - auto segment_path = local_segment_path(local_tablet->tablet_path(), rowset_meta->rowset_id().to_string(), segment_index); LOG(INFO) << "download segment file from " << get_segment_file_url << " to " << segment_path; + uint64_t estimate_timeout = estimate_download_timeout(segment_file_size); auto get_segment_file_cb = [&get_segment_file_url, &segment_path, segment_file_size, estimate_timeout, &download_success_files](HttpClient* client) { RETURN_IF_ERROR(client->init(get_segment_file_url)); @@ -309,7 +312,9 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { download_success_files.push_back(segment_path); std::string remote_file_md5; - RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + if (config::enable_download_md5sum_check) { + RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + } LOG(INFO) << "download segment file to " << segment_path << ", remote md5: " << remote_file_md5 << ", remote size: " << segment_file_size; @@ -381,7 +386,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { [&get_segment_index_file_size_url, &segment_index_file_size](HttpClient* client) { RETURN_IF_ERROR(client->init(get_segment_index_file_size_url)); - client->set_timeout_ms(kMaxTimeoutMs); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); RETURN_IF_ERROR(client->head()); return client->get_content_length(&segment_index_file_size); }; @@ -420,7 +425,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { [&get_segment_index_file_size_url, &segment_index_file_size](HttpClient* client) { RETURN_IF_ERROR(client->init(get_segment_index_file_size_url)); - client->set_timeout_ms(kMaxTimeoutMs); + client->set_timeout_ms(config::download_binlog_meta_timeout_ms); RETURN_IF_ERROR(client->head()); return client->get_content_length(&segment_index_file_size); }; @@ -468,12 +473,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { auto get_segment_index_file_url = fmt::format("{}&acquire_md5=true", segment_index_file_urls[i]); - uint64_t estimate_timeout = - segment_index_file_size / config::download_low_speed_limit_kbps / 1024; - if (estimate_timeout < config::download_low_speed_time) { - estimate_timeout = config::download_low_speed_time; - } - + uint64_t estimate_timeout = estimate_download_timeout(segment_index_file_size); auto local_segment_index_path = segment_index_file_names[i]; LOG(INFO) << fmt::format("download segment index file from {} to {}", get_segment_index_file_url, local_segment_index_path); @@ -486,7 +486,9 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { download_success_files.push_back(local_segment_index_path); std::string remote_file_md5; - RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + if (config::enable_download_md5sum_check) { + RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + } std::error_code ec; // Check file length --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org