This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a7935152e7b branch-3.0: [chore](binlog) Add download binlog related 
configs to BE #47412 (#47587)
a7935152e7b is described below

commit a7935152e7b2ff8ccc2693f149da85926009b39a
Author: walter <maoch...@selectdb.com>
AuthorDate: Wed Feb 19 19:21:38 2025 +0800

    branch-3.0: [chore](binlog) Add download binlog related configs to BE 
#47412 (#47587)
    
    cherry pick from #47412
---
 be/src/common/config.cpp           |  4 ++++
 be/src/common/config.h             |  4 ++++
 be/src/runtime/snapshot_loader.cpp | 22 ++++++++++++++------
 be/src/service/backend_service.cpp | 42 ++++++++++++++++++++------------------
 4 files changed, 46 insertions(+), 26 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f2e4ed95640..7b0079f0ce6 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -256,6 +256,10 @@ DEFINE_mInt32(download_low_speed_limit_kbps, "50");
 DEFINE_mInt32(download_low_speed_time, "300");
 // whether to download small files in batch
 DEFINE_mBool(enable_batch_download, "false");
+// whether to check md5sum when download
+DEFINE_mBool(enable_download_md5sum_check, "true");
+// download binlog meta timeout, default 30s
+DEFINE_mInt32(download_binlog_meta_timeout_ms, "30000");
 
 DEFINE_String(sys_log_dir, "");
 DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 64777a532a4..7558b5286d9 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -302,6 +302,10 @@ DECLARE_mInt32(download_low_speed_limit_kbps);
 DECLARE_mInt32(download_low_speed_time);
 // whether to download small files in batch.
 DECLARE_mBool(enable_batch_download);
+// whether to check md5sum when download
+DECLARE_mBool(enable_download_md5sum_check);
+// download binlog meta timeout
+DECLARE_mInt32(download_binlog_meta_timeout_ms);
 
 // deprecated, use env var LOG_DIR in be.conf
 DECLARE_String(sys_log_dir);
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index 97955d94051..d95bbb7f792 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -35,6 +35,7 @@
 #include <unordered_map>
 #include <utility>
 
+#include "common/config.h"
 #include "common/logging.h"
 #include "gutil/strings/split.h"
 #include "http/http_client.h"
@@ -412,9 +413,10 @@ Status SnapshotLoader::download(const 
std::map<std::string, std::string>& src_to
 Status SnapshotLoader::remote_http_download(
         const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
         std::vector<int64_t>* downloaded_tablet_ids) {
-    constexpr uint32_t kListRemoteFileTimeout = 15;
+    LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, 
task id: {}", _job_id,
+                             _task_id);
+
     constexpr uint32_t kDownloadFileMaxRetry = 3;
-    constexpr uint32_t kGetLengthTimeout = 10;
 
     // check if job has already been cancelled
     int tmp_counter = 1;
@@ -497,7 +499,7 @@ Status SnapshotLoader::remote_http_download(
         string file_list_str;
         auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* 
client) {
             RETURN_IF_ERROR(client->init(remote_url_prefix));
-            client->set_timeout_ms(kListRemoteFileTimeout * 1000);
+            client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
             return client->execute(&file_list_str);
         };
         RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 
1, list_files_cb));
@@ -513,12 +515,20 @@ Status SnapshotLoader::remote_http_download(
             uint64_t file_size = 0;
             std::string file_md5;
             auto get_file_stat_cb = [&remote_file_url, &file_size, 
&file_md5](HttpClient* client) {
-                std::string url = fmt::format("{}&acquire_md5=true", 
remote_file_url);
+                int64_t timeout_ms = config::download_binlog_meta_timeout_ms;
+                std::string url = remote_file_url;
+                if (config::enable_download_md5sum_check) {
+                    // compute md5sum is time-consuming, so we set a longer 
timeout
+                    timeout_ms = config::download_binlog_meta_timeout_ms * 3;
+                    url = fmt::format("{}&acquire_md5=true", remote_file_url);
+                }
                 RETURN_IF_ERROR(client->init(url));
-                client->set_timeout_ms(kGetLengthTimeout * 1000);
+                client->set_timeout_ms(timeout_ms);
                 RETURN_IF_ERROR(client->head());
                 RETURN_IF_ERROR(client->get_content_length(&file_size));
-                RETURN_IF_ERROR(client->get_content_md5(&file_md5));
+                if (config::enable_download_md5sum_check) {
+                    RETURN_IF_ERROR(client->get_content_md5(&file_md5));
+                }
                 return Status::OK();
             };
             RETURN_IF_ERROR(
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 6bb73f37b8c..817f7ffb914 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -90,7 +90,7 @@ class TTransportException;
 namespace doris {
 
 namespace {
-constexpr uint64_t kMaxTimeoutMs = 3000; // 3s
+
 struct IngestBinlogArg {
     int64_t txn_id;
     int64_t partition_id;
@@ -156,6 +156,14 @@ void _ingest_binlog(StorageEngine& engine, 
IngestBinlogArg* arg) {
         tstatus.error_msgs.push_back(std::move(error_msg));
     };
 
+    auto estimate_download_timeout = [](int64_t file_size) {
+        uint64_t estimate_timeout = file_size / 
config::download_low_speed_limit_kbps / 1024;
+        if (estimate_timeout < config::download_low_speed_time) {
+            estimate_timeout = config::download_low_speed_time;
+        }
+        return estimate_timeout;
+    };
+
     // Step 3: get binlog info
     auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download";, 
request.remote_host,
                                       request.remote_port);
@@ -167,7 +175,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* 
arg) {
     std::string binlog_info;
     auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* 
client) {
         RETURN_IF_ERROR(client->init(get_binlog_info_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
+        client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
         return client->execute(&binlog_info);
     };
     auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_binlog_info_cb);
@@ -206,7 +214,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* 
arg) {
     std::string rowset_meta_str;
     auto get_rowset_meta_cb = [&get_rowset_meta_url, 
&rowset_meta_str](HttpClient* client) {
         RETURN_IF_ERROR(client->init(get_rowset_meta_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
+        client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
         return client->execute(&rowset_meta_str);
     };
     status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
@@ -255,7 +263,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* 
arg) {
         auto get_segment_file_size_cb = [&get_segment_file_size_url,
                                          &segment_file_size](HttpClient* 
client) {
             RETURN_IF_ERROR(client->init(get_segment_file_size_url));
-            client->set_timeout_ms(kMaxTimeoutMs);
+            client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
             RETURN_IF_ERROR(client->head());
             return client->get_content_length(&segment_file_size);
         };
@@ -291,16 +299,11 @@ void _ingest_binlog(StorageEngine& engine, 
IngestBinlogArg* arg) {
         auto get_segment_file_url =
                 fmt::format("{}&acquire_md5=true", 
segment_file_urls[segment_index]);
 
-        uint64_t estimate_timeout =
-                segment_file_size / config::download_low_speed_limit_kbps / 
1024;
-        if (estimate_timeout < config::download_low_speed_time) {
-            estimate_timeout = config::download_low_speed_time;
-        }
-
         auto segment_path = local_segment_path(local_tablet->tablet_path(),
                                                
rowset_meta->rowset_id().to_string(), segment_index);
         LOG(INFO) << "download segment file from " << get_segment_file_url << 
" to "
                   << segment_path;
+        uint64_t estimate_timeout = 
estimate_download_timeout(segment_file_size);
         auto get_segment_file_cb = [&get_segment_file_url, &segment_path, 
segment_file_size,
                                     estimate_timeout, 
&download_success_files](HttpClient* client) {
             RETURN_IF_ERROR(client->init(get_segment_file_url));
@@ -309,7 +312,9 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* 
arg) {
             download_success_files.push_back(segment_path);
 
             std::string remote_file_md5;
-            RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+            if (config::enable_download_md5sum_check) {
+                RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+            }
             LOG(INFO) << "download segment file to " << segment_path
                       << ", remote md5: " << remote_file_md5
                       << ", remote size: " << segment_file_size;
@@ -381,7 +386,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* 
arg) {
                         [&get_segment_index_file_size_url,
                          &segment_index_file_size](HttpClient* client) {
                             
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
-                            client->set_timeout_ms(kMaxTimeoutMs);
+                            
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
                             RETURN_IF_ERROR(client->head());
                             return 
client->get_content_length(&segment_index_file_size);
                         };
@@ -420,7 +425,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* 
arg) {
                         [&get_segment_index_file_size_url,
                          &segment_index_file_size](HttpClient* client) {
                             
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
-                            client->set_timeout_ms(kMaxTimeoutMs);
+                            
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
                             RETURN_IF_ERROR(client->head());
                             return 
client->get_content_length(&segment_index_file_size);
                         };
@@ -468,12 +473,7 @@ void _ingest_binlog(StorageEngine& engine, 
IngestBinlogArg* arg) {
         auto get_segment_index_file_url =
                 fmt::format("{}&acquire_md5=true", segment_index_file_urls[i]);
 
-        uint64_t estimate_timeout =
-                segment_index_file_size / 
config::download_low_speed_limit_kbps / 1024;
-        if (estimate_timeout < config::download_low_speed_time) {
-            estimate_timeout = config::download_low_speed_time;
-        }
-
+        uint64_t estimate_timeout = 
estimate_download_timeout(segment_index_file_size);
         auto local_segment_index_path = segment_index_file_names[i];
         LOG(INFO) << fmt::format("download segment index file from {} to {}",
                                  get_segment_index_file_url, 
local_segment_index_path);
@@ -486,7 +486,9 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* 
arg) {
             download_success_files.push_back(local_segment_index_path);
 
             std::string remote_file_md5;
-            RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+            if (config::enable_download_md5sum_check) {
+                RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+            }
 
             std::error_code ec;
             // Check file length


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to