This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4100a752379 branch-3.0: [feat](clone) Speed clone tablet via batch 
small file downloading #45061 (#45191)
4100a752379 is described below

commit 4100a752379b316dbe011287f25c743613a56fb8
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 10 12:31:04 2024 +0800

    branch-3.0: [feat](clone) Speed clone tablet via batch small file 
downloading #45061 (#45191)
    
    Cherry-picked from #45061
    
    Co-authored-by: walter <maoch...@selectdb.com>
---
 be/src/common/config.cpp                      |   2 +
 be/src/common/config.h                        |   2 +
 be/src/gutil/strings/stringpiece.h            |   6 +
 be/src/http/action/batch_download_action.cpp  | 216 ++++++++++++++++++++++
 be/src/http/action/batch_download_action.h    |  65 +++++++
 be/src/http/action/download_binlog_action.cpp |   3 -
 be/src/http/http_channel.cpp                  |  56 +++++-
 be/src/http/http_channel.h                    |   8 +
 be/src/http/http_client.cpp                   | 247 ++++++++++++++++++++++++--
 be/src/http/http_client.h                     |   5 +
 be/src/http/utils.cpp                         | 130 +++++++++++++-
 be/src/http/utils.h                           |  15 +-
 be/src/olap/task/engine_clone_task.cpp        | 149 ++++++++++++++--
 be/src/olap/task/engine_clone_task.h          |   3 +
 be/src/service/http_service.cpp               |  11 ++
 be/test/http/http_client_test.cpp             |  98 +++++++++-
 16 files changed, 975 insertions(+), 41 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1ecd668e428..a1d168e9802 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -232,6 +232,8 @@ DEFINE_mInt32(max_download_speed_kbps, "50000");
 DEFINE_mInt32(download_low_speed_limit_kbps, "50");
 // download low speed time(seconds)
 DEFINE_mInt32(download_low_speed_time, "300");
+// whether to download small files in batch
+DEFINE_mBool(enable_batch_download, "false");
 
 DEFINE_String(sys_log_dir, "");
 DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 10ac38d18bb..95b04b56a5c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -282,6 +282,8 @@ DECLARE_mInt32(max_download_speed_kbps);
 DECLARE_mInt32(download_low_speed_limit_kbps);
 // download low speed time(seconds)
 DECLARE_mInt32(download_low_speed_time);
+// whether to download small files in batch.
+DECLARE_mBool(enable_batch_download);
 
 // deprecated, use env var LOG_DIR in be.conf
 DECLARE_String(sys_log_dir);
diff --git a/be/src/gutil/strings/stringpiece.h 
b/be/src/gutil/strings/stringpiece.h
index 38e36a27099..7a4ebabbf09 100644
--- a/be/src/gutil/strings/stringpiece.h
+++ b/be/src/gutil/strings/stringpiece.h
@@ -149,6 +149,12 @@ public:
         assert(length <= static_cast<size_t>(std::numeric_limits<int>::max()));
         length_ = static_cast<int>(length);
     }
+    StringPiece(std::string_view view) // NOLINT(runtime/explicit)
+            : ptr_(view.data()), length_(0) {
+        size_t length = view.size();
+        assert(length <= static_cast<size_t>(std::numeric_limits<int>::max()));
+        length_ = static_cast<int>(length);
+    }
     StringPiece(const char* offset, int len) : ptr_(offset), length_(len) { 
assert(len >= 0); }
 
     // Substring of another StringPiece.
diff --git a/be/src/http/action/batch_download_action.cpp 
b/be/src/http/action/batch_download_action.cpp
new file mode 100644
index 00000000000..d486883e90b
--- /dev/null
+++ b/be/src/http/action/batch_download_action.cpp
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/batch_download_action.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "gutil/strings/split.h"
+#include "http/http_channel.h"
+#include "http/http_method.h"
+#include "http/http_request.h"
+#include "http/utils.h"
+#include "io/fs/local_file_system.h"
+#include "runtime/exec_env.h"
+#include "util/security.h"
+
+namespace doris {
+namespace {
+const std::string CHECK_PARAMETER = "check";
+const std::string LIST_PARAMETER = "list";
+const std::string DIR_PARAMETER = "dir";
+const std::string TOKEN_PARAMETER = "token";
+} // namespace
+
+BatchDownloadAction::BatchDownloadAction(
+        ExecEnv* exec_env, std::shared_ptr<bufferevent_rate_limit_group> 
rate_limit_group,
+        const std::vector<std::string>& allow_dirs)
+        : HttpHandlerWithAuth(exec_env), 
_rate_limit_group(std::move(rate_limit_group)) {
+    for (const auto& dir : allow_dirs) {
+        std::string p;
+        Status st = io::global_local_filesystem()->canonicalize(dir, &p);
+        if (!st.ok()) {
+            continue;
+        }
+        _allow_paths.emplace_back(std::move(p));
+    }
+}
+
+void BatchDownloadAction::handle(HttpRequest* req) {
+    if (VLOG_CRITICAL_IS_ON) {
+        VLOG_CRITICAL << "accept one batch download request " << 
req->debug_string();
+    }
+
+    if (req->param(CHECK_PARAMETER) == "true") {
+        // For API support check
+        HttpChannel::send_reply(req, "OK");
+        return;
+    }
+
+    // Get 'dir' parameter, then assembly file absolute path
+    const std::string& dir_path = req->param(DIR_PARAMETER);
+    if (dir_path.empty()) {
+        std::string error_msg =
+                std::string("parameter " + DIR_PARAMETER + " not specified in 
url.");
+        LOG(WARNING) << "handle batch download request: " << error_msg
+                     << ", url: " << mask_token(req->uri());
+        HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg);
+        return;
+    }
+
+    if (dir_path.find("..") != std::string::npos) {
+        std::string error_msg = "Not allowed to read relative path: " + 
dir_path;
+        LOG(WARNING) << "handle batch download request: " << error_msg
+                     << ", url: " << mask_token(req->uri());
+        HttpChannel::send_reply(req, HttpStatus::FORBIDDEN, error_msg);
+        return;
+    }
+
+    Status status;
+    if (config::enable_token_check) {
+        status = _check_token(req);
+        if (!status.ok()) {
+            std::string error_msg = status.to_string();
+            if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
+                HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, 
error_msg);
+                return;
+            } else {
+                HttpChannel::send_reply(req, 
HttpStatus::INTERNAL_SERVER_ERROR, error_msg);
+                return;
+            }
+        }
+    }
+
+    status = _check_path_is_allowed(dir_path);
+    if (!status.ok()) {
+        std::string error_msg = status.to_string();
+        if (status.is<ErrorCode::NOT_FOUND>() || 
status.is<ErrorCode::IO_ERROR>()) {
+            HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, error_msg);
+            return;
+        } else if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
+            HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg);
+            return;
+        } else {
+            HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, 
error_msg);
+            return;
+        }
+    }
+
+    bool is_dir = false;
+    status = io::global_local_filesystem()->is_directory(dir_path, &is_dir);
+    if (!status.ok()) {
+        LOG(WARNING) << "handle batch download request: " << status.to_string()
+                     << ", url: " << mask_token(req->uri());
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, 
status.to_string());
+        return;
+    }
+
+    if (!is_dir) {
+        std::string error_msg = fmt::format("The requested path is not a 
directory: {}", dir_path);
+        LOG(WARNING) << "handle batch download request: " << error_msg
+                     << ", url: " << mask_token(req->uri());
+        HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg);
+        return;
+    }
+
+    _handle(req, dir_path);
+
+    VLOG_CRITICAL << "deal with batch download request finished! ";
+}
+
+void BatchDownloadAction::_handle(HttpRequest* req, const std::string& 
dir_path) {
+    bool is_list_request = req->param(LIST_PARAMETER) == "true";
+    if (is_list_request) {
+        // return the list of files in the specified directory
+        bool is_acquire_filesize = true;
+        do_dir_response(dir_path, req, is_acquire_filesize);
+    } else {
+        _handle_batch_download(req, dir_path);
+    }
+}
+
+void BatchDownloadAction::_handle_batch_download(HttpRequest* req, const 
std::string& dir_path) {
+    std::vector<std::string> files =
+            strings::Split(req->get_request_body(), "\n", 
strings::SkipWhitespace());
+    if (files.empty()) {
+        std::string error_msg = "No file specified in request body.";
+        LOG(WARNING) << "handle batch download request: " << error_msg
+                     << ", url: " << mask_token(req->uri());
+        HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg);
+        return;
+    }
+
+    if (files.size() > 64) {
+        std::string error_msg =
+                "The number of files to download in a batch should be less 
than 64.";
+        LOG(WARNING) << "handle batch download request: " << error_msg
+                     << ", url: " << mask_token(req->uri());
+        HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg);
+        return;
+    }
+
+    for (const auto& file : files) {
+        if (file.find('/') != std::string::npos) {
+            std::string error_msg =
+                    fmt::format("Not allowed to read relative path: {}, dir: 
{}", file, dir_path);
+            LOG(WARNING) << "handle batch download request: " << error_msg
+                         << ", url: " << mask_token(req->uri());
+            HttpChannel::send_reply(req, HttpStatus::FORBIDDEN, error_msg);
+            return;
+        }
+    }
+
+    HttpChannel::send_files(req, dir_path, std::move(files));
+}
+
+Status BatchDownloadAction::_check_token(HttpRequest* req) {
+    const std::string& token_str = req->param(TOKEN_PARAMETER);
+    if (token_str.empty()) {
+        LOG(WARNING) << "token is not specified in request. url: " << 
mask_token(req->uri());
+        return Status::NotAuthorized("token is not specified.");
+    }
+
+    const std::string& local_token = _exec_env->token();
+    if (token_str != local_token) {
+        LOG(WARNING) << "invalid download token: " << mask_token(token_str)
+                     << ", local token: " << mask_token(local_token)
+                     << ", url: " << mask_token(req->uri());
+        return Status::NotAuthorized("invalid token {}", 
mask_token(token_str));
+    }
+
+    return Status::OK();
+}
+
+Status BatchDownloadAction::_check_path_is_allowed(const std::string& 
file_path) {
+    std::string canonical_file_path;
+    RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(file_path, 
&canonical_file_path));
+    for (auto& allow_path : _allow_paths) {
+        if (io::LocalFileSystem::contain_path(allow_path, 
canonical_file_path)) {
+            return Status::OK();
+        }
+    }
+
+    return Status::NotAuthorized("file path is not allowed: {}", 
canonical_file_path);
+}
+
+} // end namespace doris
diff --git a/be/src/http/action/batch_download_action.h 
b/be/src/http/action/batch_download_action.h
new file mode 100644
index 00000000000..f0b7e3576b9
--- /dev/null
+++ b/be/src/http/action/batch_download_action.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "http/http_handler.h"
+#include "http/http_handler_with_auth.h"
+#include "util/threadpool.h"
+
+struct bufferevent_rate_limit_group;
+
+namespace doris {
+
+class ExecEnv;
+class HttpRequest;
+
+// A simple handler that serves incoming HTTP requests of batching 
file-download to send their
+// respective HTTP responses.
+//
+// We use parameter named 'dir' to specify the static resource path, it is an 
absolute path.
+//
+// In HEAD request, then this handler will return the list of files in the 
specified directory.
+//
+// In GET request, the file names to download are specified in the request 
body as a list of strings,
+// separated by '\n'. To avoid cost resource, the maximum number of files to 
download in a batch is 64.
+class BatchDownloadAction : public HttpHandlerWithAuth {
+public:
+    BatchDownloadAction(ExecEnv* exec_env,
+                        std::shared_ptr<bufferevent_rate_limit_group> 
rate_limit_group,
+                        const std::vector<std::string>& allow_dirs);
+
+    ~BatchDownloadAction() override = default;
+
+    void handle(HttpRequest* req) override;
+
+private:
+    Status _check_token(HttpRequest* req);
+    Status _check_path_is_allowed(const std::string& path);
+
+    void _handle(HttpRequest* req, const std::string& dir_path);
+    void _handle_batch_download(HttpRequest* req, const std::string& dir_path);
+
+    std::vector<std::string> _allow_paths;
+    std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group;
+};
+
+} // end namespace doris
diff --git a/be/src/http/action/download_binlog_action.cpp 
b/be/src/http/action/download_binlog_action.cpp
index 54701c5e463..372f840401c 100644
--- a/be/src/http/action/download_binlog_action.cpp
+++ b/be/src/http/action/download_binlog_action.cpp
@@ -21,11 +21,9 @@
 #include <fmt/ranges.h>
 
 #include <cstdint>
-#include <limits>
 #include <stdexcept>
 #include <string_view>
 #include <utility>
-#include <vector>
 
 #include "common/config.h"
 #include "common/logging.h"
@@ -34,7 +32,6 @@
 #include "http/utils.h"
 #include "io/fs/local_file_system.h"
 #include "olap/storage_engine.h"
-#include "olap/tablet.h"
 #include "olap/tablet_manager.h"
 #include "runtime/exec_env.h"
 
diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp
index 96679195316..312f1ab9286 100644
--- a/be/src/http/http_channel.cpp
+++ b/be/src/http/http_channel.cpp
@@ -20,8 +20,8 @@
 #include <event2/buffer.h>
 #include <event2/bufferevent.h>
 #include <event2/http.h>
+#include <event2/http_struct.h>
 
-#include <algorithm>
 #include <sstream>
 #include <string>
 #include <vector>
@@ -57,7 +57,7 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus 
status) {
 }
 
 void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const 
std::string& content) {
-    auto evb = evbuffer_new();
+    auto* evb = evbuffer_new();
     std::string compressed_content;
     if (compress_content(request->header(HttpHeaders::ACCEPT_ENCODING), 
content,
                          &compressed_content)) {
@@ -72,7 +72,7 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus 
status, const std:
 
 void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t 
size,
                             bufferevent_rate_limit_group* rate_limit_group) {
-    auto evb = evbuffer_new();
+    auto* evb = evbuffer_new();
     evbuffer_add_file(evb, fd, off, size);
     auto* evhttp_request = request->get_evhttp_request();
     if (rate_limit_group) {
@@ -84,6 +84,56 @@ void HttpChannel::send_file(HttpRequest* request, int fd, 
size_t off, size_t siz
     evbuffer_free(evb);
 }
 
+void HttpChannel::send_files(HttpRequest* request, const std::string& root_dir,
+                             std::vector<std::string> local_files,
+                             bufferevent_rate_limit_group* rate_limit_group) {
+    if (rate_limit_group) {
+        auto* evhttp_request = request->get_evhttp_request();
+        auto* evhttp_connection = 
evhttp_request_get_connection(evhttp_request);
+        auto* buffer_event = 
evhttp_connection_get_bufferevent(evhttp_connection);
+        bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group);
+    }
+
+    send_files(request, root_dir, std::move(local_files));
+}
+
+void HttpChannel::send_files(HttpRequest* request, const std::string& root_dir,
+                             std::vector<std::string> local_files) {
+    std::unique_ptr<evbuffer, decltype(&evbuffer_free)> evb(evbuffer_new(), 
&evbuffer_free);
+    for (const std::string& file : local_files) {
+        std::string file_path = fmt::format("{}/{}", root_dir, file);
+        int fd = open(file_path.c_str(), O_RDONLY);
+        if (fd < 0) {
+            std::string error_msg = "Failed to open file: " + file_path;
+            LOG(WARNING) << "http channel send files: " << error_msg;
+            HttpChannel::send_reply(request, HttpStatus::NOT_FOUND, error_msg);
+            return;
+        }
+        struct stat st;
+        auto res = fstat(fd, &st);
+        if (res < 0) {
+            close(fd);
+            std::string error_msg = "Failed to open file: " + file_path;
+            LOG(WARNING) << "http channel send files: " << error_msg;
+            HttpChannel::send_reply(request, HttpStatus::NOT_FOUND, error_msg);
+            return;
+        }
+
+        int64_t file_size = st.st_size;
+        VLOG_DEBUG << "http channel send file " << file_path << ", size: " << 
file_size;
+
+        evbuffer_add_printf(evb.get(), "File-Name: %s\r\n", file.c_str());
+        evbuffer_add_printf(evb.get(), "Content-Length: %ld\r\n", file_size);
+        evbuffer_add_printf(evb.get(), "\r\n");
+        if (file_size > 0) {
+            evbuffer_add_file(evb.get(), fd, 0, file_size);
+        }
+    }
+
+    evhttp_send_reply(request->get_evhttp_request(), HttpStatus::OK,
+                      default_reason(HttpStatus::OK).c_str(), evb.get());
+}
+
 bool HttpChannel::compress_content(const std::string& accept_encoding, const 
std::string& input,
                                    std::string* output) {
     // Don't bother compressing empty content.
diff --git a/be/src/http/http_channel.h b/be/src/http/http_channel.h
index ee1e6c0888f..0d5e5d4260a 100644
--- a/be/src/http/http_channel.h
+++ b/be/src/http/http_channel.h
@@ -20,6 +20,7 @@
 #include <stddef.h>
 
 #include <string>
+#include <vector>
 
 #include "http/http_status.h"
 
@@ -47,6 +48,13 @@ public:
     static void send_file(HttpRequest* request, int fd, size_t off, size_t 
size,
                           bufferevent_rate_limit_group* rate_limit_group = 
nullptr);
 
+    static void send_files(HttpRequest* request, const std::string& root_dir,
+                           std::vector<std::string> local_files,
+                           bufferevent_rate_limit_group* rate_limit_group);
+
+    static void send_files(HttpRequest* request, const std::string& root_dir,
+                           std::vector<std::string> local_files);
+
     static bool compress_content(const std::string& accept_encoding, const 
std::string& input,
                                  std::string* output);
 };
diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp
index fc4c997fce8..767377cea3f 100644
--- a/be/src/http/http_client.cpp
+++ b/be/src/http/http_client.cpp
@@ -24,14 +24,225 @@
 #include <ostream>
 
 #include "common/config.h"
+#include "common/status.h"
 #include "http/http_headers.h"
-#include "http/http_status.h"
 #include "runtime/exec_env.h"
 #include "util/security.h"
 #include "util/stack_util.h"
 
 namespace doris {
 
+class MultiFileSplitter {
+public:
+    MultiFileSplitter(std::string local_dir, std::unordered_set<std::string> 
expected_files)
+            : _local_dir_path(std::move(local_dir)), 
_expected_files(std::move(expected_files)) {}
+    ~MultiFileSplitter() {
+        if (_fd >= 0) {
+            close(_fd);
+        }
+
+        if (!_status.ok() && !downloaded_files.empty()) {
+            LOG(WARNING) << "download files to " << _local_dir_path << " 
failed, try remove the "
+                         << downloaded_files.size() << " downloaded files";
+            for (const auto& file : downloaded_files) {
+                remove(file.c_str());
+            }
+        }
+    }
+
+    bool append(const char* data, size_t length) {
+        // Already failed.
+        if (!_status.ok()) {
+            return false;
+        }
+
+        std::string buf;
+        if (!_buffer.empty()) {
+            buf.swap(_buffer);
+            buf.append(data, length);
+            data = buf.data();
+            length = buf.size();
+        }
+        return append_inner(data, length);
+    }
+
+    Status finish() {
+        if (_status.ok()) {
+            _status = finish_inner();
+        }
+
+        return _status;
+    }
+
+private:
+    bool append_inner(const char* data, size_t length) {
+        while (length > 0) {
+            int consumed = 0;
+            if (_is_reading_header) {
+                consumed = parse_header(data, length);
+            } else {
+                consumed = append_file(data, length);
+            }
+
+            if (consumed < 0) {
+                return false;
+            }
+
+            DCHECK(consumed <= length);
+            data += consumed;
+            length -= consumed;
+        }
+        return true;
+    }
+
+    int parse_header(const char* data, size_t length) {
+        DCHECK(_fd < 0);
+
+        std::string_view buf(data, length);
+        size_t pos = buf.find("\r\n\r\n");
+        if (pos == std::string::npos) {
+            _buffer.append(data, length);
+            return static_cast<int>(length);
+        }
+
+        // header already read.
+        _is_reading_header = false;
+
+        bool has_file_name = false;
+        bool has_file_size = false;
+        std::string_view header = buf.substr(0, pos);
+        std::vector<std::string> headers =
+                strings::Split(header, "\r\n", strings::SkipWhitespace());
+        for (auto& s : headers) {
+            size_t header_pos = s.find(':');
+            if (header_pos == std::string::npos) {
+                continue;
+            }
+            std::string_view header_view(s);
+            std::string_view key = header_view.substr(0, header_pos);
+            std::string_view value = header_view.substr(header_pos + 1);
+            if (value.starts_with(' ')) {
+                value.remove_prefix(std::min(value.find_first_not_of(' '), 
value.size()));
+            }
+            if (key == "File-Name") {
+                _file_name = value;
+                has_file_name = true;
+            } else if (key == "Content-Length") {
+                auto res = std::from_chars(value.data(), value.data() + 
value.size(), _file_size);
+                if (res.ec != std::errc()) {
+                    std::string error_msg = fmt::format("invalid content 
length: {}", value);
+                    LOG(WARNING) << "download files to " << _local_dir_path
+                                 << "failed, err=" << error_msg;
+                    _status = Status::HttpError(std::move(error_msg));
+                    return -1;
+                }
+                has_file_size = true;
+            }
+        }
+
+        if (!has_file_name || !has_file_size) {
+            std::string error_msg =
+                    fmt::format("invalid multi part header, has file name: {}, 
has file size: {}",
+                                has_file_name, has_file_size);
+            LOG(WARNING) << "download files to " << _local_dir_path << 
"failed, err=" << error_msg;
+            _status = Status::HttpError(std::move(error_msg));
+            return -1;
+        }
+
+        if (!_expected_files.contains(_file_name)) {
+            std::string error_msg = fmt::format("unexpected file: {}", 
_file_name);
+            LOG(WARNING) << "download files to " << _local_dir_path << 
"failed, err=" << error_msg;
+            _status = Status::HttpError(std::move(error_msg));
+            return -1;
+        }
+
+        VLOG_DEBUG << "receive file " << _file_name << ", size " << _file_size;
+
+        _written_size = 0;
+        _local_file_path = fmt::format("{}/{}", _local_dir_path, _file_name);
+        _fd = open(_local_file_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 
0644);
+        if (_fd < 0) {
+            std::string error_msg = "fail to open file to write: " + 
_local_file_path;
+            LOG(WARNING) << "download files to " << _local_dir_path << 
"failed, err=" << error_msg;
+            _status = Status::IOError(std::move(error_msg));
+            return -1;
+        }
+        downloaded_files.push_back(_local_file_path);
+
+        return static_cast<int>(pos + 4);
+    }
+
+    int append_file(const char* data, size_t length) {
+        DCHECK(_fd >= 0);
+        DCHECK(_file_size >= _written_size);
+
+        size_t write_size = std::min(length, _file_size - _written_size);
+        if (write_size > 0 && write(_fd, data, write_size) < 0) {
+            auto msg = fmt::format("write file failed, file={}, error={}", 
_local_file_path,
+                                   strerror(errno));
+            LOG(WARNING) << "download files to " << _local_dir_path << 
"failed, err=" << msg;
+            _status = Status::HttpError(std::move(msg));
+            return -1;
+        }
+
+        _written_size += write_size;
+        if (_written_size == _file_size) {
+            // This file has been downloaded, switch to the next one.
+            switchToNextFile();
+        }
+
+        return write_size;
+    }
+
+    Status finish_inner() {
+        if (!_is_reading_header && _written_size == _file_size) {
+            switchToNextFile();
+        }
+
+        if (_fd >= 0) {
+            // This file is not completely downloaded.
+            close(_fd);
+            _fd = -1;
+            auto error_msg = fmt::format("file {} is not completely 
downloaded", _local_file_path);
+            LOG(WARNING) << "download files to " << _local_dir_path << 
"failed, err=" << error_msg;
+            return Status::HttpError(std::move(error_msg));
+        }
+
+        if (!_expected_files.empty()) {
+            auto error_msg = fmt::format("not all files are downloaded, {} 
missing files",
+                                         _expected_files.size());
+            LOG(WARNING) << "download files to " << _local_dir_path << 
"failed, err=" << error_msg;
+            return Status::HttpError(std::move(error_msg));
+        }
+
+        downloaded_files.clear();
+        return Status::OK();
+    }
+
+    void switchToNextFile() {
+        DCHECK(_fd >= 0);
+        DCHECK(_written_size == _file_size);
+
+        close(_fd);
+        _fd = -1;
+        _expected_files.erase(_file_name);
+        _is_reading_header = true;
+    }
+
+    const std::string _local_dir_path;
+    std::string _buffer;
+    std::unordered_set<std::string> _expected_files;
+    Status _status;
+
+    bool _is_reading_header = true;
+    int _fd = -1;
+    std::string _local_file_path;
+    std::string _file_name;
+    size_t _file_size = 0;
+    size_t _written_size = 0;
+    std::vector<std::string> downloaded_files;
+};
+
 static const char* header_error_msg(CURLHcode code) {
     switch (code) {
     case CURLHE_OK:
@@ -174,6 +385,12 @@ void HttpClient::set_method(HttpMethod method) {
     }
 }
 
+void HttpClient::set_speed_limit() {
+    curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_LIMIT, 
config::download_low_speed_limit_kbps * 1024);
+    curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_TIME, 
config::download_low_speed_time);
+    curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, 
config::max_download_speed_kbps * 1024);
+}
+
 size_t HttpClient::on_response_data(const void* data, size_t length) {
     if (*_callback != nullptr) {
         bool is_continue = (*_callback)(data, length);
@@ -184,12 +401,6 @@ size_t HttpClient::on_response_data(const void* data, 
size_t length) {
     return length;
 }
 
-// Status HttpClient::execute_post_request(const std::string& post_data, const 
std::function<bool(const void* data, size_t length)>& callback = {}) {
-//     _callback = &callback;
-//     set_post_body(post_data);
-//     return execute(callback);
-// }
-
 Status HttpClient::execute_post_request(const std::string& payload, 
std::string* response) {
     set_method(POST);
     set_payload(payload);
@@ -234,14 +445,8 @@ Status HttpClient::get_content_md5(std::string* md5) const 
{
 }
 
 Status HttpClient::download(const std::string& local_path) {
-    // set method to GET
     set_method(GET);
-
-    // TODO(zc) Move this download speed limit outside to limit download speed
-    // at system level
-    curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_LIMIT, 
config::download_low_speed_limit_kbps * 1024);
-    curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_TIME, 
config::download_low_speed_time);
-    curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, 
config::max_download_speed_kbps * 1024);
+    set_speed_limit();
 
     auto fp_closer = [](FILE* fp) { fclose(fp); };
     std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(local_path.c_str(), 
"w"), fp_closer);
@@ -270,6 +475,20 @@ Status HttpClient::download(const std::string& local_path) 
{
     return status;
 }
 
+Status HttpClient::download_multi_files(const std::string& local_dir,
+                                        const std::unordered_set<std::string>& 
expected_files) {
+    set_speed_limit();
+
+    MultiFileSplitter splitter(local_dir, expected_files);
+    auto callback = [&](const void* data, size_t length) {
+        return splitter.append(reinterpret_cast<const char*>(data), length);
+    };
+    if (auto s = execute(callback); !s.ok()) {
+        return s;
+    }
+    return splitter.finish();
+}
+
 Status HttpClient::execute(std::string* response) {
     auto callback = [response](const void* data, size_t length) {
         response->append((char*)data, length);
diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h
index c0c8863a9b0..a6f2f4fdff5 100644
--- a/be/src/http/http_client.h
+++ b/be/src/http/http_client.h
@@ -24,6 +24,7 @@
 #include <cstdio>
 #include <functional>
 #include <string>
+#include <unordered_set>
 
 #include "common/status.h"
 #include "http/http_headers.h"
@@ -81,6 +82,8 @@ public:
         curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYHOST, 0L);
     }
 
+    void set_speed_limit();
+
     // TODO(zc): support set header
     // void set_header(const std::string& key, const std::string& value) {
     // _cntl.http_request().SetHeader(key, value);
@@ -141,6 +144,8 @@ public:
     // helper function to download a file, you can call this function to 
download
     // a file to local_path
     Status download(const std::string& local_path);
+    Status download_multi_files(const std::string& local_dir,
+                                const std::unordered_set<std::string>& 
expected_files);
 
     Status execute_post_request(const std::string& payload, std::string* 
response);
 
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index f91610476b4..ee7a78113e5 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -23,6 +23,8 @@
 #include <unistd.h>
 
 #include <ostream>
+#include <string>
+#include <unordered_map>
 #include <vector>
 
 #include "common/config.h"
@@ -30,6 +32,7 @@
 #include "common/status.h"
 #include "common/utils.h"
 #include "http/http_channel.h"
+#include "http/http_client.h"
 #include "http/http_common.h"
 #include "http/http_headers.h"
 #include "http/http_method.h"
@@ -41,10 +44,15 @@
 #include "runtime/exec_env.h"
 #include "util/md5.h"
 #include "util/path_util.h"
+#include "util/security.h"
 #include "util/url_coding.h"
 
 namespace doris {
 
+const uint32_t CHECK_SUPPORT_TIMEOUT = 3;
+const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
+const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15;
+
 std::string encode_basic_auth(const std::string& user, const std::string& 
passwd) {
     std::string auth = user + ":" + passwd;
     std::string encoded_auth;
@@ -190,20 +198,26 @@ void do_file_response(const std::string& file_path, 
HttpRequest* req,
     HttpChannel::send_file(req, fd, 0, file_size, rate_limit_group);
 }
 
-void do_dir_response(const std::string& dir_path, HttpRequest* req) {
+void do_dir_response(const std::string& dir_path, HttpRequest* req, bool 
is_acquire_filesize) {
     bool exists = true;
     std::vector<io::FileInfo> files;
     Status st = io::global_local_filesystem()->list(dir_path, true, &files, 
&exists);
     if (!st.ok()) {
         LOG(WARNING) << "Failed to scan dir. " << st;
         HttpChannel::send_error(req, HttpStatus::INTERNAL_SERVER_ERROR);
+        return;
     }
 
+    VLOG_DEBUG << "list dir: " << dir_path << ", file count: " << files.size();
+
     const std::string FILE_DELIMITER_IN_DIR_RESPONSE = "\n";
 
     std::stringstream result;
     for (auto& file : files) {
         result << file.file_name << FILE_DELIMITER_IN_DIR_RESPONSE;
+        if (is_acquire_filesize) {
+            result << file.file_size << FILE_DELIMITER_IN_DIR_RESPONSE;
+        }
     }
 
     std::string result_str = result.str();
@@ -221,4 +235,118 @@ bool load_size_smaller_than_wal_limit(int64_t 
content_length) {
     return (content_length < 0.8 * max_available_size);
 }
 
+Status is_support_batch_download(const std::string& endpoint) {
+    std::string url = 
fmt::format("http://{}/api/_tablet/_batch_download?check=true";, endpoint);
+    auto check_support_cb = [&url](HttpClient* client) {
+        RETURN_IF_ERROR(client->init(url));
+        client->set_timeout_ms(CHECK_SUPPORT_TIMEOUT * 1000);
+        client->set_method(HttpMethod::HEAD);
+        std::string response;
+        return client->execute(&response);
+    };
+    return HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, 
check_support_cb);
+}
+
+Status list_remote_files_v2(const std::string& address, const std::string& 
token,
+                            const std::string& remote_dir,
+                            std::vector<std::pair<std::string, size_t>>* 
file_info_list) {
+    std::string remote_url =
+            
fmt::format("http://{}/api/_tablet/_batch_download?token={}&dir={}&list=true";, 
address,
+                        token, remote_dir);
+
+    std::string file_list_str;
+    auto list_files_cb = [&](HttpClient* client) {
+        file_list_str.clear();
+        RETURN_IF_ERROR(client->init(remote_url, false));
+        client->set_method(HttpMethod::GET);
+        client->set_timeout_ms(LIST_REMOTE_FILE_TIMEOUT * 1000);
+        return client->execute(&file_list_str);
+    };
+    Status status = HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, 
list_files_cb);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to list remote files from " << remote_url
+                     << ", status: " << status.to_string() << ", response: " 
<< file_list_str;
+        return status;
+    }
+
+    std::vector<string> file_list = strings::Split(file_list_str, "\n", 
strings::SkipWhitespace());
+    if (file_list.size() % 2 != 0) {
+        return Status::InternalError("batch download files: invalid file list, 
size is not even");
+    }
+
+    VLOG_DEBUG << "list remote files from " << remote_url
+               << ", file count: " << file_list.size() / 2;
+
+    for (size_t i = 0; i < file_list.size(); i += 2) {
+        uint64_t file_size = 0;
+        try {
+            file_size = std::stoull(file_list[i + 1]);
+        } catch (std::exception&) {
+            return Status::InternalError("batch download files: invalid file 
size format: " +
+                                         file_list[i + 1]);
+        }
+        file_info_list->emplace_back(std::move(file_list[i]), file_size);
+    }
+
+    return Status::OK();
+}
+
+Status download_files_v2(const std::string& address, const std::string& token,
+                         const std::string& remote_dir, const std::string& 
local_dir,
+                         const std::vector<std::pair<std::string, size_t>>& 
file_info_list) {
+    std::string remote_url = 
fmt::format("http://{}/api/_tablet/_batch_download?dir={}&token={}";,
+                                         address, remote_dir, token);
+
+    size_t batch_file_size = 0;
+    std::unordered_set<std::string> expected_files;
+    std::stringstream ss;
+    for (const auto& file_info : file_info_list) {
+        ss << file_info.first << "\n";
+        batch_file_size += file_info.second;
+        expected_files.insert(file_info.first);
+    }
+    std::string payload = ss.str();
+
+    uint64_t estimate_timeout = batch_file_size / 
config::download_low_speed_limit_kbps / 1024;
+    if (estimate_timeout < config::download_low_speed_time) {
+        estimate_timeout = config::download_low_speed_time;
+    }
+
+    LOG(INFO) << "begin to download files from " << remote_url << " to " << 
local_dir
+              << ", file count: " << file_info_list.size() << ", total size: " 
<< batch_file_size
+              << ", timeout: " << estimate_timeout;
+
+    auto callback = [&](HttpClient* client) -> Status {
+        RETURN_IF_ERROR(client->init(remote_url, false));
+        client->set_method(HttpMethod::POST);
+        client->set_payload(payload);
+        client->set_timeout_ms(estimate_timeout * 1000);
+        RETURN_IF_ERROR(client->download_multi_files(local_dir, 
expected_files));
+        for (auto&& [file_name, file_size] : file_info_list) {
+            std::string local_file_path = local_dir + "/" + file_name;
+
+            std::error_code ec;
+            // Check file length
+            uint64_t local_file_size = 
std::filesystem::file_size(local_file_path, ec);
+            if (ec) {
+                LOG(WARNING) << "download file error: " << ec.message();
+                return Status::IOError("can't retrive file_size of {}, due to 
{}", local_file_path,
+                                       ec.message());
+            }
+            if (local_file_size != file_size) {
+                LOG(WARNING) << "download file length error"
+                             << ", remote_path=" << mask_token(remote_url)
+                             << ", file_name=" << file_name << ", file_size=" 
<< file_size
+                             << ", local_file_size=" << local_file_size;
+                return Status::InternalError("downloaded file size is not 
equal");
+            }
+            RETURN_IF_ERROR(io::global_local_filesystem()->permission(
+                    local_file_path, io::LocalFileSystem::PERMS_OWNER_RW));
+        }
+
+        return Status::OK();
+    };
+    return HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, 
callback);
+}
+
 } // namespace doris
diff --git a/be/src/http/utils.h b/be/src/http/utils.h
index 20be6c0fcd7..b9abb7c6208 100644
--- a/be/src/http/utils.h
+++ b/be/src/http/utils.h
@@ -40,9 +40,22 @@ void do_file_response(const std::string& dir_path, 
HttpRequest* req,
                       bufferevent_rate_limit_group* rate_limit_group = nullptr,
                       bool is_acquire_md5 = false);
 
-void do_dir_response(const std::string& dir_path, HttpRequest* req);
+void do_dir_response(const std::string& dir_path, HttpRequest* req,
+                     bool is_acquire_filesize = false);
 
 std::string get_content_type(const std::string& file_name);
 
 bool load_size_smaller_than_wal_limit(int64_t content_length);
+
+// Whether a backend supports batch download
+Status is_support_batch_download(const std::string& address);
+
+Status list_remote_files_v2(const std::string& address, const std::string& 
token,
+                            const std::string& remote_dir,
+                            std::vector<std::pair<std::string, size_t>>* 
file_info_list);
+
+Status download_files_v2(const std::string& address, const std::string& token,
+                         const std::string& remote_dir, const std::string& 
local_dir,
+                         const std::vector<std::pair<std::string, size_t>>& 
file_info_list);
+
 } // namespace doris
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index bea1d3b1a91..fa8d9b8248e 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -44,6 +44,7 @@
 #include "gutil/strings/split.h"
 #include "gutil/strings/strip.h"
 #include "http/http_client.h"
+#include "http/utils.h"
 #include "io/fs/file_system.h"
 #include "io/fs/local_file_system.h"
 #include "io/fs/path.h"
@@ -399,28 +400,62 @@ Status 
EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir,
                         .error(st);
             }
         }};
-        std::string remote_url_prefix;
+
+        std::string remote_dir;
         {
             std::stringstream ss;
             if (snapshot_path->back() == '/') {
-                ss << "http://"; << get_host_port(src.host, src.http_port) << 
HTTP_REQUEST_PREFIX
-                   << HTTP_REQUEST_TOKEN_PARAM << token << 
HTTP_REQUEST_FILE_PARAM << *snapshot_path
-                   << _clone_req.tablet_id << "/" << _clone_req.schema_hash << 
"/";
+                ss << *snapshot_path << _clone_req.tablet_id << "/" << 
_clone_req.schema_hash
+                   << "/";
             } else {
-                ss << "http://"; << get_host_port(src.host, src.http_port) << 
HTTP_REQUEST_PREFIX
-                   << HTTP_REQUEST_TOKEN_PARAM << token << 
HTTP_REQUEST_FILE_PARAM << *snapshot_path
-                   << "/" << _clone_req.tablet_id << "/" << 
_clone_req.schema_hash << "/";
+                ss << *snapshot_path << "/" << _clone_req.tablet_id << "/" << 
_clone_req.schema_hash
+                   << "/";
             }
-            remote_url_prefix = ss.str();
+            remote_dir = ss.str();
         }
 
-        status = _download_files(&data_dir, remote_url_prefix, 
local_data_path);
-        if (!status.ok()) [[unlikely]] {
-            LOG_WARNING("failed to download snapshot from remote BE")
-                    .tag("url", mask_token(remote_url_prefix))
-                    .error(status);
-            continue; // Try another BE
+        std::string address = get_host_port(src.host, src.http_port);
+        if (config::enable_batch_download && 
is_support_batch_download(address).ok()) {
+            // download files via batch api.
+            LOG_INFO("remote BE supports batch download, use batch file 
download")
+                    .tag("address", address)
+                    .tag("remote_dir", remote_dir);
+            status = _batch_download_files(&data_dir, address, remote_dir, 
local_data_path);
+            if (!status.ok()) [[unlikely]] {
+                LOG_WARNING("failed to download snapshot from remote BE in 
batch")
+                        .tag("address", address)
+                        .tag("remote_dir", remote_dir)
+                        .error(status);
+                continue; // Try another BE
+            }
+        } else {
+            if (config::enable_batch_download) {
+                LOG_INFO("remote BE does not support batch download, use 
single file download")
+                        .tag("address", address)
+                        .tag("remote_dir", remote_dir);
+            } else {
+                LOG_INFO("batch download is disabled, use single file 
download")
+                        .tag("address", address)
+                        .tag("remote_dir", remote_dir);
+            }
+
+            std::string remote_url_prefix;
+            {
+                std::stringstream ss;
+                ss << "http://"; << address << HTTP_REQUEST_PREFIX << 
HTTP_REQUEST_TOKEN_PARAM
+                   << token << HTTP_REQUEST_FILE_PARAM << remote_dir;
+                remote_url_prefix = ss.str();
+            }
+
+            status = _download_files(&data_dir, remote_url_prefix, 
local_data_path);
+            if (!status.ok()) [[unlikely]] {
+                LOG_WARNING("failed to download snapshot from remote BE")
+                        .tag("url", mask_token(remote_url_prefix))
+                        .error(status);
+                continue; // Try another BE
+            }
         }
+
         // No need to try again with another BE
         _pending_rs_guards = 
DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids(
                 local_data_path, _clone_req.tablet_id, _clone_req.replica_id, 
_clone_req.table_id,
@@ -514,7 +549,7 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, 
const std::string& re
     // If the header file is not exist, the table couldn't loaded by olap 
engine.
     // Avoid of data is not complete, we copy the header file at last.
     // The header file's name is end of .hdr.
-    for (int i = 0; i < file_name_list.size() - 1; ++i) {
+    for (int i = 0; i + 1 < file_name_list.size(); ++i) {
         if (file_name_list[i].ends_with(".hdr")) {
             std::swap(file_name_list[i], file_name_list[file_name_list.size() 
- 1]);
             break;
@@ -593,13 +628,91 @@ Status EngineCloneTask::_download_files(DataDir* 
data_dir, const std::string& re
     }
     _copy_size = (int64_t)total_file_size;
     _copy_time_ms = (int64_t)total_time_ms;
-    LOG(INFO) << "succeed to copy tablet " << _signature << ", total file 
size: " << total_file_size
-              << " B"
-              << ", cost: " << total_time_ms << " ms"
+    LOG(INFO) << "succeed to copy tablet " << _signature
+              << ", total files: " << file_name_list.size()
+              << ", total file size: " << total_file_size << " B, cost: " << 
total_time_ms << " ms"
               << ", rate: " << copy_rate << " MB/s";
     return Status::OK();
 }
 
+Status EngineCloneTask::_batch_download_files(DataDir* data_dir, const 
std::string& address,
+                                              const std::string& remote_dir,
+                                              const std::string& local_dir) {
+    constexpr size_t BATCH_FILE_SIZE = 64 << 20; // 64MB
+    constexpr size_t BATCH_FILE_NUM = 64;
+
+    // Check local path exist, if exist, remove it, then create the dir
+    // local_file_full_path = tabletid/clone, for a specific tablet, there 
should be only one folder
+    // if this folder exists, then should remove it
+    // for example, BE clone from BE 1 to download file 1 with version (2,2), 
but clone from BE 1 failed
+    // then it will try to clone from BE 2, but it will find the file 1 
already exist, but file 1 with same
+    // name may have different versions.
+    
RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(local_dir));
+    
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(local_dir));
+
+    const std::string& token = _cluster_info->token;
+    std::vector<std::pair<std::string, size_t>> file_info_list;
+    RETURN_IF_ERROR(list_remote_files_v2(address, token, remote_dir, 
&file_info_list));
+
+    // If the header file is not exist, the table couldn't loaded by olap 
engine.
+    // Avoid of data is not complete, we copy the header file at last.
+    // The header file's name is end of .hdr.
+    for (int i = 0; i + 1 < file_info_list.size(); ++i) {
+        if (file_info_list[i].first.ends_with(".hdr")) {
+            std::swap(file_info_list[i], file_info_list[file_info_list.size() 
- 1]);
+            break;
+        }
+    }
+
+    MonotonicStopWatch watch;
+    watch.start();
+
+    size_t total_file_size = 0;
+    size_t total_files = file_info_list.size();
+    std::vector<std::pair<std::string, size_t>> batch_files;
+    for (size_t i = 0; i < total_files;) {
+        size_t batch_file_size = 0;
+        for (size_t j = i; j < total_files; j++) {
+            // Split batchs by file number and file size,
+            if (BATCH_FILE_NUM <= batch_files.size() || BATCH_FILE_SIZE <= 
batch_file_size ||
+                // ... or separate the last .hdr file into a single batch.
+                (j + 1 == total_files && !batch_files.empty())) {
+                break;
+            }
+            batch_files.push_back(file_info_list[j]);
+            batch_file_size += file_info_list[j].second;
+        }
+
+        // check disk capacity
+        if (data_dir->reach_capacity_limit(batch_file_size)) {
+            return Status::Error<EXCEEDED_LIMIT>(
+                    "reach the capacity limit of path {}, file_size={}", 
data_dir->path(),
+                    batch_file_size);
+        }
+
+        RETURN_IF_ERROR(download_files_v2(address, token, remote_dir, 
local_dir, batch_files));
+
+        total_file_size += batch_file_size;
+        i += batch_files.size();
+        batch_files.clear();
+    }
+
+    uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
+    total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
+    double copy_rate = 0.0;
+    if (total_time_ms > 0) {
+        copy_rate = total_file_size / ((double)total_time_ms) / 1000;
+    }
+    _copy_size = (int64_t)total_file_size;
+    _copy_time_ms = (int64_t)total_time_ms;
+    LOG(INFO) << "succeed to copy tablet " << _signature
+              << ", total files: " << file_info_list.size()
+              << ", total file size: " << total_file_size << " B, cost: " << 
total_time_ms << " ms"
+              << ", rate: " << copy_rate << " MB/s";
+
+    return Status::OK();
+}
+
 /// This method will only be called if tablet already exist in this BE when 
doing clone.
 /// This method will do the following things:
 /// 1. Link all files from CLONE dir to tablet dir if file does not exist in 
tablet dir
diff --git a/be/src/olap/task/engine_clone_task.h 
b/be/src/olap/task/engine_clone_task.h
index a11d4c742f4..e2ced28f03c 100644
--- a/be/src/olap/task/engine_clone_task.h
+++ b/be/src/olap/task/engine_clone_task.h
@@ -79,6 +79,9 @@ private:
     Status _download_files(DataDir* data_dir, const std::string& 
remote_url_prefix,
                            const std::string& local_path);
 
+    Status _batch_download_files(DataDir* data_dir, const std::string& 
endpoint,
+                                 const std::string& remote_dir, const 
std::string& local_dir);
+
     Status _make_snapshot(const std::string& ip, int port, TTableId tablet_id,
                           TSchemaHash schema_hash, int timeout_s,
                           const std::vector<Version>& missing_versions, 
std::string* snapshot_path,
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index a74f00291de..0ee484ec886 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -32,6 +32,7 @@
 #include "common/status.h"
 #include "http/action/adjust_log_level.h"
 #include "http/action/adjust_tracing_dump.h"
+#include "http/action/batch_download_action.h"
 #include "http/action/be_proc_thread_action.h"
 #include "http/action/calc_file_crc_action.h"
 #include "http/action/check_rpc_channel_action.h"
@@ -290,6 +291,16 @@ void HttpService::register_local_handler(StorageEngine& 
engine) {
                                       tablet_download_action);
     _ev_http_server->register_handler(HttpMethod::GET, 
"/api/_tablet/_download",
                                       tablet_download_action);
+
+    BatchDownloadAction* batch_download_action =
+            _pool.add(new BatchDownloadAction(_env, _rate_limit_group, 
allow_paths));
+    _ev_http_server->register_handler(HttpMethod::HEAD, 
"/api/_tablet/_batch_download",
+                                      batch_download_action);
+    _ev_http_server->register_handler(HttpMethod::GET, 
"/api/_tablet/_batch_download",
+                                      batch_download_action);
+    _ev_http_server->register_handler(HttpMethod::POST, 
"/api/_tablet/_batch_download",
+                                      batch_download_action);
+
     if (config::enable_single_replica_load) {
         DownloadAction* single_replica_download_action = _pool.add(new 
DownloadAction(
                 _env, nullptr, allow_paths, 
config::single_replica_load_download_num_workers));
diff --git a/be/test/http/http_client_test.cpp 
b/be/test/http/http_client_test.cpp
index c98328d7c8e..84e4d259ff5 100644
--- a/be/test/http/http_client_test.cpp
+++ b/be/test/http/http_client_test.cpp
@@ -25,6 +25,7 @@
 #include <unistd.h>
 
 #include <boost/algorithm/string/predicate.hpp>
+#include <filesystem>
 
 #include "gtest/gtest_pred_impl.h"
 #include "http/ev_http_server.h"
@@ -102,14 +103,32 @@ public:
     }
 };
 
+class HttpBatchDownloadFileHandler : public HttpHandler {
+public:
+    void handle(HttpRequest* req) override {
+        if (req->param("check") == "true") {
+            HttpChannel::send_reply(req, "OK");
+        } else if (req->param("list") == "true") {
+            do_dir_response(req->param("dir"), req, true);
+        } else {
+            std::vector<std::string> acquire_files =
+                    strings::Split(req->get_request_body(), "\n", 
strings::SkipWhitespace());
+            HttpChannel::send_files(req, req->param("dir"), acquire_files);
+        }
+    }
+};
+
 static EvHttpServer* s_server = nullptr;
 static int real_port = 0;
 static std::string hostname = "";
+static std::string address = "";
+constexpr std::string_view TMP_DIR = "./http_test_tmp";
 
 static HttpClientTestSimpleGetHandler s_simple_get_handler;
 static HttpClientTestSimplePostHandler s_simple_post_handler;
 static HttpNotFoundHandler s_not_found_handler;
 static HttpDownloadFileHandler s_download_file_handler;
+static HttpBatchDownloadFileHandler s_batch_download_file_handler;
 
 class HttpClientTest : public testing::Test {
 public:
@@ -123,10 +142,17 @@ public:
         s_server->register_handler(POST, "/simple_post", 
&s_simple_post_handler);
         s_server->register_handler(GET, "/not_found", &s_not_found_handler);
         s_server->register_handler(HEAD, "/download_file", 
&s_download_file_handler);
+        s_server->register_handler(HEAD, "/api/_tablet/_batch_download",
+                                   &s_batch_download_file_handler);
+        s_server->register_handler(GET, "/api/_tablet/_batch_download",
+                                   &s_batch_download_file_handler);
+        s_server->register_handler(POST, "/api/_tablet/_batch_download",
+                                   &s_batch_download_file_handler);
         static_cast<void>(s_server->start());
         real_port = s_server->get_real_port();
         EXPECT_NE(0, real_port);
-        hostname = "http://127.0.0.1:"; + std::to_string(real_port);
+        address = "127.0.0.1:" + std::to_string(real_port);
+        hostname = "http://"; + address;
     }
 
     static void TearDownTestCase() { delete s_server; }
@@ -571,4 +597,74 @@ TEST_F(HttpClientTest, enable_http_auth) {
     }
 }
 
+TEST_F(HttpClientTest, batch_download) {
+    EXPECT_TRUE(io::global_local_filesystem()->delete_directory(TMP_DIR).ok());
+    EXPECT_TRUE(io::global_local_filesystem()->create_directory(TMP_DIR).ok());
+
+    std::string root_dir(TMP_DIR);
+    std::string remote_related_dir = root_dir + "/source";
+    std::string local_dir = root_dir + "/target";
+    
EXPECT_TRUE(io::global_local_filesystem()->create_directory(remote_related_dir).ok());
+    
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_dir).ok());
+
+    std::string remote_dir;
+    
EXPECT_TRUE(io::global_local_filesystem()->canonicalize(remote_related_dir, 
&remote_dir).ok());
+
+    // 0. create dir source and prepare a large file exceeds 1MB
+    {
+        std::string large_file = remote_dir + "/a_large_file";
+        int fd = open(large_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
+        ASSERT_TRUE(fd >= 0);
+        std::string buf = "0123456789";
+        for (int i = 0; i < 10; i++) {
+            buf += buf;
+        }
+        for (int i = 0; i < 1024; i++) {
+            ASSERT_TRUE(write(fd, buf.c_str(), buf.size()) > 0);
+        }
+        close(fd);
+
+        // create some small files.
+        for (int i = 0; i < 32; i++) {
+            std::string small_file = remote_dir + "/small_file_" + 
std::to_string(i);
+            fd = open(small_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
+            ASSERT_TRUE(fd >= 0);
+            ASSERT_TRUE(write(fd, buf.c_str(), buf.size()) > 0);
+            close(fd);
+        }
+
+        // create a empty file
+        std::string empty_file = remote_dir + "/empty_file";
+        fd = open(empty_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
+        ASSERT_TRUE(fd >= 0);
+        close(fd);
+
+        empty_file = remote_dir + "/zzzz";
+        fd = open(empty_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
+        ASSERT_TRUE(fd >= 0);
+        close(fd);
+    }
+
+    // 1. check remote support batch download
+    Status st = is_support_batch_download(address);
+    EXPECT_TRUE(st.ok());
+
+    // 2. list remote files
+    std::vector<std::pair<std::string, size_t>> file_info_list;
+    st = list_remote_files_v2(address, "token", remote_dir, &file_info_list);
+    EXPECT_TRUE(st.ok());
+
+    // 3. download files
+    if (file_info_list.size() > 64) {
+        file_info_list.resize(64);
+    }
+
+    // sort file info list by file name
+    std::sort(file_info_list.begin(), file_info_list.end(),
+              [](const auto& a, const auto& b) { return a.first < b.first; });
+
+    st = download_files_v2(address, "token", remote_dir, local_dir, 
file_info_list);
+    EXPECT_TRUE(st.ok());
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to